http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/537f631a/modules/core/src/test/java/org/apache/ignite/internal/GridFailoverTaskWithPredicateSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/GridFailoverTaskWithPredicateSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/GridFailoverTaskWithPredicateSelfTest.java
new file mode 100644
index 0000000..d41c325
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/GridFailoverTaskWithPredicateSelfTest.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.compute.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.marshaller.optimized.*;
+import org.apache.ignite.resources.*;
+import org.apache.ignite.spi.failover.*;
+import org.apache.ignite.spi.failover.always.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.atomic.*;
+
+/**
+ * Test failover of a task with Node filter predicate.
+ */
+@GridCommonTest(group = "Kernal Self")
+public class GridFailoverTaskWithPredicateSelfTest extends 
GridCommonAbstractTest {
+    /** First node's name. */
+    private static final String NODE1 = "NODE1";
+
+    /** Second node's name. */
+    private static final String NODE2 = "NODE2";
+
+    /** Third node's name. */
+    private static final String NODE3 = "NODE3";
+
+    /** Predicate to exclude the second node from topology */
+    private final IgnitePredicate<ClusterNode> p = new 
IgnitePredicate<ClusterNode>() {
+        @Override
+        public boolean apply(ClusterNode e) {
+            return 
!NODE2.equals(e.attribute(GridNodeAttributes.ATTR_GRID_NAME));
+        }
+    };
+
+    /** Whether delegating fail over node was found or not. */
+    private final AtomicBoolean routed = new AtomicBoolean();
+
+    /** Whether job execution failed with exception. */
+    private final AtomicBoolean failed = new AtomicBoolean();
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.setFailoverSpi(new AlwaysFailoverSpi() {
+            /** {@inheritDoc} */
+            @Override public ClusterNode failover(FailoverContext ctx, 
List<ClusterNode> grid) {
+                ClusterNode failoverNode = super.failover(ctx, grid);
+
+                if (failoverNode != null)
+                    routed.set(true);
+                else
+                    routed.set(false);
+
+                return failoverNode;
+            }
+        });
+
+        cfg.setMarshaller(new IgniteOptimizedMarshaller(false));
+
+        return cfg;
+    }
+
+    /**
+     * Tests that failover doesn't happen on two-node grid when the Task is 
applicable only for the first node
+     * and fails on it.
+     *
+     * @throws Exception If failed.
+     */
+    public void testJobNotFailedOver() throws Exception {
+        failed.set(false);
+        routed.set(false);
+
+        try {
+            Ignite ignite1 = startGrid(NODE1);
+            Ignite ignite2 = startGrid(NODE2);
+
+            assert ignite1 != null;
+            assert ignite2 != null;
+
+            
compute(ignite1.cluster().forPredicate(p)).withTimeout(10000).execute(JobFailTask.class.getName(),
 "1");
+        }
+        catch (ClusterTopologyException ignored) {
+            failed.set(true);
+        }
+        finally {
+            assertTrue(failed.get());
+            assertFalse(routed.get());
+
+            stopGrid(NODE1);
+            stopGrid(NODE2);
+        }
+    }
+
+    /**
+     * Tests that failover happens on three-node grid when the Task is 
applicable for the first node
+     * and fails on it, but is also applicable on another node.
+     *
+     * @throws Exception If failed.
+     */
+    public void testJobFailedOver() throws Exception {
+        failed.set(false);
+        routed.set(false);
+
+        try {
+            Ignite ignite1 = startGrid(NODE1);
+            Ignite ignite2 = startGrid(NODE2);
+            Ignite ignite3 = startGrid(NODE3);
+
+            assert ignite1 != null;
+            assert ignite2 != null;
+            assert ignite3 != null;
+
+            Integer res = 
(Integer)compute(ignite1.cluster().forPredicate(p)).withTimeout(10000).
+                execute(JobFailTask.class.getName(), "1");
+
+            assert res == 1;
+        }
+        catch (ClusterTopologyException ignored) {
+            failed.set(true);
+        }
+        finally {
+            assertFalse(failed.get());
+            assertTrue(routed.get());
+
+            stopGrid(NODE1);
+            stopGrid(NODE2);
+            stopGrid(NODE3);
+        }
+    }
+
+    /**
+     * Tests that in case of failover our predicate is intersected with 
projection
+     * (logical AND is performed).
+     *
+     * @throws Exception If error happens.
+     */
+    public void testJobNotFailedOverWithStaticProjection() throws Exception {
+        failed.set(false);
+        routed.set(false);
+
+        try {
+            Ignite ignite1 = startGrid(NODE1);
+            Ignite ignite2 = startGrid(NODE2);
+            Ignite ignite3 = startGrid(NODE3);
+
+            assert ignite1 != null;
+            assert ignite2 != null;
+            assert ignite3 != null;
+
+            // Get projection only for first 2 nodes.
+            ClusterGroup nodes = ignite1.cluster().forNodeIds(Arrays.asList(
+                ignite1.cluster().localNode().id(),
+                ignite2.cluster().localNode().id()));
+
+            // On failover NODE3 shouldn't be taken into account.
+            Integer res = 
(Integer)compute(nodes.forPredicate(p)).withTimeout(10000).
+                execute(JobFailTask.class.getName(), "1");
+
+            assert res == 1;
+        }
+        catch (ClusterTopologyException ignored) {
+            failed.set(true);
+        }
+        finally {
+            assertTrue(failed.get());
+            assertFalse(routed.get());
+
+            stopGrid(NODE1);
+            stopGrid(NODE2);
+            stopGrid(NODE3);
+        }
+    }
+
+    /** */
+    @ComputeTaskSessionFullSupport
+    private static class JobFailTask implements ComputeTask<String, Object> {
+        /** */
+        @IgniteTaskSessionResource
+        private ComputeTaskSession ses;
+
+        /** {@inheritDoc} */
+        @Override public Map<? extends ComputeJob, ClusterNode> 
map(List<ClusterNode> subgrid, String arg) throws IgniteCheckedException {
+            ses.setAttribute("fail", true);
+
+            return Collections.singletonMap(new ComputeJobAdapter(arg) {
+                /** {@inheritDoc} */
+                @SuppressWarnings({"RedundantTypeArguments"})
+                @Override
+                public Serializable execute() throws IgniteCheckedException {
+                    boolean fail;
+
+                    try {
+                        fail = ses.<String, Boolean>waitForAttribute("fail", 
0);
+                    }
+                    catch (InterruptedException e) {
+                        throw new IgniteCheckedException("Got interrupted 
while waiting for attribute to be set.", e);
+                    }
+
+                    if (fail) {
+                        ses.setAttribute("fail", false);
+
+                        throw new IgniteCheckedException("Job exception.");
+                    }
+
+                    // This job does not return any result.
+                    return Integer.parseInt(this.<String>argument(0));
+                }
+            }, subgrid.get(0));
+        }
+
+        /** {@inheritDoc} */
+        @Override public ComputeJobResultPolicy result(ComputeJobResult res, 
List<ComputeJobResult> received)
+                throws IgniteCheckedException {
+            if (res.getException() != null && !(res.getException() instanceof 
ComputeUserUndeclaredException))
+                return ComputeJobResultPolicy.FAILOVER;
+
+            return ComputeJobResultPolicy.REDUCE;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object reduce(List<ComputeJobResult> results) throws 
IgniteCheckedException {
+            assert results.size() == 1;
+
+            return results.get(0).getData();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/537f631a/modules/core/src/test/java/org/apache/ignite/internal/GridFailoverTopologySelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/GridFailoverTopologySelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/GridFailoverTopologySelfTest.java
new file mode 100644
index 0000000..3a99f4e
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/GridFailoverTopologySelfTest.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.compute.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.resources.*;
+import org.apache.ignite.spi.failover.*;
+import org.apache.ignite.spi.failover.always.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.atomic.*;
+
+/**
+ * Test failover and topology. It don't pick local node if it has been 
excluded from topology.
+ */
+@GridCommonTest(group = "Kernal Self")
+public class GridFailoverTopologySelfTest extends GridCommonAbstractTest {
+    /** */
+    private final AtomicBoolean failed = new AtomicBoolean(false);
+
+    /** */
+    public GridFailoverTopologySelfTest() {
+        super(/*start Grid*/false);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.setNodeId(null);
+
+        cfg.setFailoverSpi(new AlwaysFailoverSpi() {
+            /** Ignite instance. */
+            @IgniteInstanceResource
+            private Ignite ignite;
+
+            /** {@inheritDoc} */
+            @Override public ClusterNode failover(FailoverContext ctx, 
List<ClusterNode> grid) {
+                if (grid.size() != 1) {
+                    failed.set(true);
+
+                    error("Unexpected grid size [expected=1, grid=" + grid + 
']');
+                }
+
+                UUID locNodeId = ignite.configuration().getNodeId();
+
+                for (ClusterNode node : grid) {
+                    if (node.id().equals(locNodeId)) {
+                        failed.set(true);
+
+                        error("Grid shouldn't contain local node 
[localNodeId=" + locNodeId + ", grid=" + grid + ']');
+                    }
+                }
+
+                return super.failover(ctx, grid);
+            }
+        });
+
+        return cfg;
+    }
+
+    /**
+     * Tests that failover don't pick local node if it has been excluded from 
topology.
+     *
+     * @throws Exception If failed.
+     */
+    @SuppressWarnings("unchecked")
+    public void testFailoverTopology() throws Exception {
+        try {
+            Ignite ignite1 = startGrid(1);
+
+            startGrid(2);
+
+            ignite1.compute().localDeployTask(JobFailTask.class, 
JobFailTask.class.getClassLoader());
+
+            try {
+                
compute(ignite1.cluster().forRemotes()).execute(JobFailTask.class, null);
+            }
+            catch (IgniteCheckedException e) {
+                info("Got expected grid exception: " + e);
+            }
+
+            assert !failed.get();
+        }
+        finally {
+            stopGrid(1);
+            stopGrid(2);
+        }
+    }
+
+    /** */
+    private static class JobFailTask implements ComputeTask<String, Object> {
+        /** Ignite instance. */
+        @IgniteInstanceResource
+        private Ignite ignite;
+
+        /** */
+        private boolean jobFailedOver;
+
+        /** {@inheritDoc} */
+        @Override public Map<? extends ComputeJob, ClusterNode> 
map(List<ClusterNode> subgrid, String arg) throws IgniteCheckedException {
+            assert ignite != null;
+
+            UUID locNodeId = ignite.configuration().getNodeId();
+
+            assert locNodeId != null;
+
+            ClusterNode remoteNode = null;
+
+            for (ClusterNode node : subgrid) {
+                if (!node.id().equals(locNodeId))
+                    remoteNode = node;
+            }
+
+            return Collections.singletonMap(new ComputeJobAdapter(arg) {
+                @Override public Serializable execute() throws 
IgniteCheckedException {
+                    throw new IgniteCheckedException("Job exception.");
+                }
+            }, remoteNode);
+        }
+
+        /** {@inheritDoc} */
+        @Override public ComputeJobResultPolicy result(ComputeJobResult res, 
List<ComputeJobResult> received) throws IgniteCheckedException {
+            if (res.getException() != null && !jobFailedOver) {
+                jobFailedOver = true;
+
+                return ComputeJobResultPolicy.FAILOVER;
+            }
+
+            return ComputeJobResultPolicy.REDUCE;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object reduce(List<ComputeJobResult> results) throws 
IgniteCheckedException {
+            assert results.size() == 1;
+
+            return results.get(0).getData();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/537f631a/modules/core/src/test/java/org/apache/ignite/internal/GridHomePathSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/GridHomePathSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/GridHomePathSelfTest.java
new file mode 100644
index 0000000..f3cf621
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/GridHomePathSelfTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import static org.apache.ignite.IgniteSystemProperties.*;
+
+/**
+ *
+ */
+public class GridHomePathSelfTest extends GridCommonAbstractTest {
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.setLocalHost(getTestResources().getLocalHost());
+
+        return cfg;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testHomeOverride() throws Exception {
+        try {
+            startGrid(0);
+
+            // Test home override.
+            IgniteConfiguration c = getConfiguration(getTestGridName(1));
+
+            c.setGridGainHome("/new/path");
+
+            try {
+                G.start(c);
+
+                assert false : "Exception should have been thrown.";
+            }
+            catch (Exception e) {
+                if (X.hasCause(e, IgniteException.class))
+                    info("Caught expected exception: " + e);
+                else
+                    throw e;
+            }
+
+            // Test no override.
+            IgniteConfiguration c1 = getConfiguration(getTestGridName(1));
+
+            c1.setGridGainHome(System.getProperty(GG_HOME));
+
+            G.start(c1);
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/537f631a/modules/core/src/test/java/org/apache/ignite/internal/GridJobCheckpointCleanupSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/GridJobCheckpointCleanupSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/GridJobCheckpointCleanupSelfTest.java
new file mode 100644
index 0000000..ea2d4bc
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/GridJobCheckpointCleanupSelfTest.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.compute.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.resources.*;
+import org.apache.ignite.spi.*;
+import org.apache.ignite.spi.checkpoint.*;
+import org.apache.ignite.testframework.junits.common.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+import java.util.concurrent.atomic.*;
+
+/**
+ * Test for checkpoint cleanup.
+ */
+public class GridJobCheckpointCleanupSelfTest extends GridCommonAbstractTest {
+    /** Number of currently alive checkpoints. */
+    private final AtomicInteger cntr = new AtomicInteger();
+
+    /** Checkpoint. */
+    private CheckpointSpi checkpointSpi;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
+        IgniteConfiguration c = super.getConfiguration(gridName);
+
+        c.setCheckpointSpi(checkpointSpi);
+
+        return c;
+    }
+
+    /**
+     * Spawns one job on the node other than task node and
+     * ensures that all checkpoints were removed after task completion.
+     *
+     * @throws Exception if failed.
+     */
+    public void testCheckpointCleanup() throws Exception {
+        try {
+            checkpointSpi = new TestCheckpointSpi("task-checkpoints", cntr);
+
+            Ignite taskIgnite = startGrid(0);
+
+            checkpointSpi = new TestCheckpointSpi("job-checkpoints", cntr);
+
+            Ignite jobIgnite = startGrid(1);
+
+            taskIgnite.compute().execute(new CheckpointCountingTestTask(), 
jobIgnite.cluster().localNode());
+        }
+        finally {
+            stopAllGrids();
+        }
+
+        assertEquals(cntr.get(), 0);
+    }
+
+    /**
+     * Test checkpoint SPI.
+     */
+    @IgniteSpiMultipleInstancesSupport(true)
+    private static class TestCheckpointSpi extends IgniteSpiAdapter implements 
CheckpointSpi {
+        /** Counter. */
+        private final AtomicInteger cntr;
+
+        /**
+         * @param name Name.
+         * @param cntr Counter.
+         */
+        TestCheckpointSpi(String name, AtomicInteger cntr) {
+            setName(name);
+
+            this.cntr = cntr;
+        }
+
+        /** {@inheritDoc} */
+        @Override public byte[] loadCheckpoint(String key) throws 
IgniteSpiException {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean saveCheckpoint(String key, byte[] state, long 
timeout, boolean overwrite)
+            throws IgniteSpiException {
+            cntr.incrementAndGet();
+
+            return true;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean removeCheckpoint(String key) {
+            cntr.decrementAndGet();
+
+            return true;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void setCheckpointListener(CheckpointListener lsnr) {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public void spiStart(@Nullable String gridName) throws 
IgniteSpiException {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public void spiStop() throws IgniteSpiException {
+            // No-op.
+        }
+    }
+
+    /**
+     *
+     */
+    @ComputeTaskSessionFullSupport
+    private static class CheckpointCountingTestTask extends 
ComputeTaskAdapter<ClusterNode, Object> {
+        /** {@inheritDoc} */
+        @Override public Map<? extends ComputeJob, ClusterNode> 
map(List<ClusterNode> subgrid, @Nullable ClusterNode arg)
+            throws IgniteCheckedException {
+            for (ClusterNode node : subgrid) {
+                if (node.id().equals(arg.id()))
+                    return Collections.singletonMap(new ComputeJobAdapter() {
+                        @IgniteTaskSessionResource
+                        private ComputeTaskSession ses;
+
+                        @Nullable @Override public Object execute() throws 
IgniteCheckedException {
+                            ses.saveCheckpoint("checkpoint-key", 
"checkpoint-value");
+
+                            return null;
+                        }
+                    }, node);
+            }
+
+            assert false : "Expected node wasn't found in grid";
+
+            // Never accessible.
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object reduce(List<ComputeJobResult> results) throws 
IgniteCheckedException {
+            return null;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/537f631a/modules/core/src/test/java/org/apache/ignite/internal/GridJobCollisionCancelSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/GridJobCollisionCancelSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/GridJobCollisionCancelSelfTest.java
new file mode 100644
index 0000000..bf967a6
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/GridJobCollisionCancelSelfTest.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import org.apache.ignite.*;
+import org.apache.ignite.compute.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.resources.*;
+import org.apache.ignite.spi.*;
+import org.apache.ignite.spi.collision.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * Job collision cancel test.
+ */
+@SuppressWarnings( {"PublicInnerClass"})
+@GridCommonTest(group = "Kernal Self")
+public class GridJobCollisionCancelSelfTest extends GridCommonAbstractTest {
+    /** */
+    private static final Object mux = new Object();
+
+    /** */
+    private static final int SPLIT_COUNT = 2;
+
+    /** */
+    private static final long maxJobExecTime = 10000;
+
+    /** */
+    private static int cancelCnt;
+
+    /** */
+    private static int execCnt;
+
+    /** */
+    private static int colResolutionCnt;
+
+    /** */
+    public GridJobCollisionCancelSelfTest() {
+        super(true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @SuppressWarnings( {"AssignmentToCatchBlockParameter"})
+    public void testCancel() throws Exception {
+        Ignite ignite = G.ignite(getTestGridName());
+
+        ignite.compute().localDeployTask(GridCancelTestTask.class, 
GridCancelTestTask.class.getClassLoader());
+
+        ComputeTaskFuture<?> res0 =
+            executeAsync(ignite.compute().withTimeout(maxJobExecTime * 2), 
GridCancelTestTask.class.getName(), null);
+
+        try {
+            Object res = res0.get();
+
+            info("Cancel test result: " + res);
+
+            synchronized (mux) {
+                // Every execute must be called.
+                assert execCnt <= SPLIT_COUNT : "Invalid execute count: " + 
execCnt;
+
+                // Job returns 1 if was cancelled.
+                assert (Integer)res <= SPLIT_COUNT  : "Invalid task result: " 
+ res;
+
+                // Should be exactly the same as Jobs number.
+                assert cancelCnt <= SPLIT_COUNT : "Invalid cancel count: " + 
cancelCnt;
+
+                // One per start and one per stop and some that come with 
heartbeats.
+                assert colResolutionCnt > SPLIT_COUNT + 1:
+                    "Invalid collision resolution count: " + colResolutionCnt;
+            }
+        }
+        catch (ComputeTaskTimeoutException e) {
+            error("Task execution got timed out.", e);
+        }
+        catch (Exception e) {
+            assert e.getCause() != null;
+
+            if (e.getCause() instanceof IgniteCheckedException)
+                e = (Exception)e.getCause();
+
+            if (e.getCause() instanceof IOException)
+                e = (Exception)e.getCause();
+
+            assert e.getCause() instanceof InterruptedException : "Invalid 
exception cause: " + e.getCause();
+        }
+    }
+
+    /**
+     * @return Configuration.
+     * @throws Exception If failed.
+     */
+    @Override protected IgniteConfiguration getConfiguration() throws 
Exception {
+        IgniteConfiguration cfg = super.getConfiguration();
+
+        cfg.setCollisionSpi(new GridTestCollision());
+
+        return cfg;
+    }
+
+    /**
+     *
+     */
+    public static class GridCancelTestTask extends 
ComputeTaskSplitAdapter<Serializable, Object> {
+        /** */
+        @IgniteLoggerResource
+        private IgniteLogger log;
+
+        /** {@inheritDoc} */
+        @Override public Collection<? extends ComputeJob> split(int gridSize, 
Serializable arg) {
+            if (log.isInfoEnabled())
+                log.info("Splitting task [task=" + this + ", gridSize=" + 
gridSize + ", arg=" + arg + ']');
+
+            Collection<GridCancelTestJob> jobs = new ArrayList<>(SPLIT_COUNT);
+
+            for (int i = 0; i < SPLIT_COUNT; i++)
+                jobs.add(new GridCancelTestJob());
+
+            return jobs;
+        }
+
+
+        /** {@inheritDoc} */
+        @Override public Object reduce(List<ComputeJobResult> results) {
+            if (log.isInfoEnabled())
+                log.info("Aggregating job [job=" + this + ", results=" + 
results + ']');
+
+            int res = 0;
+
+            for (ComputeJobResult result : results) {
+                assert result != null;
+
+                if (result.getData() != null)
+                    res += (Integer)result.getData();
+            }
+
+            return res;
+        }
+    }
+
+    /**
+     * Test job.
+     */
+    public static class GridCancelTestJob extends ComputeJobAdapter {
+        /** */
+        @IgniteLoggerResource
+        private IgniteLogger log;
+
+        /** */
+        @IgniteJobContextResource
+        private ComputeJobContext jobCtx;
+
+        /** */
+        @SuppressWarnings( {"FieldAccessedSynchronizedAndUnsynchronized"})
+        private boolean isCancelled;
+
+        /** */
+        private final long thresholdTime;
+
+        /** */
+        public GridCancelTestJob() {
+            thresholdTime = System.currentTimeMillis() + maxJobExecTime;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Serializable execute() {
+            synchronized (mux) {
+                execCnt++;
+            }
+
+            if (log.isInfoEnabled())
+                log.info("Executing job: " + jobCtx.getJobId());
+
+            long now = System.currentTimeMillis();
+
+            while (!isCancelled && now < thresholdTime) {
+                synchronized (mux) {
+                    try {
+                        mux.wait(thresholdTime - now);
+                    }
+                    catch (InterruptedException ignored) {
+                        // No-op.
+                    }
+                }
+
+                now = System.currentTimeMillis();
+            }
+
+            synchronized (mux) {
+                return isCancelled ? 1 : 0;
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public void cancel() {
+            synchronized (mux) {
+                isCancelled = true;
+
+                cancelCnt++;
+
+                mux.notifyAll();
+            }
+
+            log.warning("Job cancelled: " + jobCtx.getJobId());
+        }
+    }
+
+
+    /**
+     * Test collision SPI.
+     */
+    @IgniteSpiMultipleInstancesSupport(true)
+    public static class GridTestCollision extends IgniteSpiAdapter implements 
CollisionSpi {
+        /** */
+        @IgniteLoggerResource
+        private IgniteLogger log;
+
+        /** {@inheritDoc} */
+        @Override public void onCollision(CollisionContext ctx) {
+            Collection<CollisionJobContext> activeJobs = ctx.activeJobs();
+            Collection<CollisionJobContext> waitJobs = ctx.waitingJobs();
+
+            synchronized (mux) {
+                colResolutionCnt++;
+            }
+
+            for (CollisionJobContext job : waitJobs)
+                job.activate();
+
+            for (CollisionJobContext job : activeJobs)
+                job.cancel();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void spiStart(String gridName) throws 
IgniteSpiException {
+            // Start SPI start stopwatch.
+            startStopwatch();
+
+            // Ack start.
+            if (log.isInfoEnabled())
+                log.info(startInfo());
+        }
+
+        /** {@inheritDoc} */
+        @Override public void spiStop() throws IgniteSpiException {
+            // Ack stop.
+            if (log.isInfoEnabled())
+                log.info(stopInfo());
+        }
+
+        /** {@inheritDoc} */
+        @Override public void 
setExternalCollisionListener(CollisionExternalListener lsnr) {
+            // No-op.
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/537f631a/modules/core/src/test/java/org/apache/ignite/internal/GridJobContextSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/GridJobContextSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/GridJobContextSelfTest.java
new file mode 100644
index 0000000..d0c1a70
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/GridJobContextSelfTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import org.apache.ignite.*;
+import org.apache.ignite.compute.*;
+import org.apache.ignite.resources.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * Job context test.
+ */
+@GridCommonTest(group = "Kernal Self")
+public class GridJobContextSelfTest extends GridCommonAbstractTest {
+    /**
+     * @throws Exception If anything failed.
+     */
+    public void testJobContext() throws Exception {
+        Ignite ignite = startGrid(1);
+
+        try {
+            startGrid(2);
+
+            try {
+                ignite.compute().execute(JobContextTask.class, null);
+            }
+            finally {
+                stopGrid(2);
+            }
+        }
+        finally{
+            stopGrid(1);
+        }
+    }
+
+    /** */
+    @SuppressWarnings("PublicInnerClass")
+    public static class JobContextTask extends ComputeTaskSplitAdapter<Object, 
Object> {
+        @Override protected Collection<? extends ComputeJob> split(int 
gridSize, Object arg) throws IgniteCheckedException {
+            Collection<ComputeJobAdapter> jobs = new ArrayList<>(gridSize);
+
+            for (int i = 0; i < gridSize; i++) {
+                jobs.add(new ComputeJobAdapter() {
+                    /** */
+                    @IgniteJobContextResource
+                    private ComputeJobContext jobCtx;
+
+                    /** Ignite instance. */
+                    @IgniteInstanceResource
+                    private Ignite ignite;
+
+                    /** {@inheritDoc} */
+                    @Override public Serializable execute() {
+                        UUID locNodeId = ignite.configuration().getNodeId();
+
+                        jobCtx.setAttribute("nodeId", locNodeId);
+                        jobCtx.setAttribute("jobId", jobCtx.getJobId());
+
+                        Map<String, String> attrs = new HashMap<>(10);
+
+                        for (int i = 0; i < 10; i++) {
+                            String s = jobCtx.getJobId().toString() + i;
+
+                            attrs.put(s, s);
+                        }
+
+                        jobCtx.setAttributes(attrs);
+
+                        assert jobCtx.getAttribute("nodeId").equals(locNodeId);
+                        assert 
jobCtx.getAttributes().get("nodeId").equals(locNodeId);
+                        assert 
jobCtx.getAttributes().keySet().containsAll(attrs.keySet());
+                        assert 
jobCtx.getAttributes().values().containsAll(attrs.values());
+
+                        return null;
+                    }
+                });
+            }
+
+            return jobs;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object reduce(List<ComputeJobResult> results) throws 
IgniteCheckedException {
+            for (ComputeJobResult res : results) {
+                ComputeJobContext jobCtx = res.getJobContext();
+
+                assert 
jobCtx.getAttribute("nodeId").equals(res.getNode().id());
+                assert 
jobCtx.getAttributes().get("nodeId").equals(res.getNode().id());
+
+                assert jobCtx.getAttribute("jobId").equals(jobCtx.getJobId());
+
+                for (int i = 0; i < 10; i++) {
+                    String s = jobCtx.getJobId().toString() + i;
+
+                    assert jobCtx.getAttribute(s).equals(s);
+                    assert jobCtx.getAttributes().get(s).equals(s);
+                }
+            }
+
+            return null;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/537f631a/modules/core/src/test/java/org/apache/ignite/internal/GridJobMasterLeaveAwareSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/GridJobMasterLeaveAwareSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/GridJobMasterLeaveAwareSelfTest.java
new file mode 100644
index 0000000..cb51453
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/GridJobMasterLeaveAwareSelfTest.java
@@ -0,0 +1,802 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cache.affinity.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.compute.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.marshaller.optimized.*;
+import org.apache.ignite.resources.*;
+import org.apache.ignite.spi.*;
+import org.apache.ignite.internal.managers.communication.*;
+import org.apache.ignite.spi.communication.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.internal.util.direct.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.testframework.junits.common.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+
+import static java.util.concurrent.TimeUnit.*;
+import static org.apache.ignite.cache.GridCacheMode.*;
+
+/**
+ * Test behavior of jobs when master node has failed, but job class implements 
{@link org.apache.ignite.compute.ComputeJobMasterLeaveAware}
+ * interface.
+ */
+@GridCommonTest(group = "Task Session")
+public class GridJobMasterLeaveAwareSelfTest extends GridCommonAbstractTest {
+    /** Total grid count within the cloud. */
+    private static final int GRID_CNT = 2;
+
+    /** Default IP finder for single-JVM cloud grid. */
+    private static final TcpDiscoveryIpFinder ipFinder = new 
TcpDiscoveryVmIpFinder(true);
+
+    /** Counts how many times master-leave interface implementation was 
called. */
+    private static volatile CountDownLatch invokeLatch;
+
+    /** Latch which blocks job execution until main thread has sent node fail 
signal. */
+    private static volatile CountDownLatch latch;
+
+    /** Latch which blocks main thread until all jobs start their execution. */
+    private static volatile CountDownLatch jobLatch;
+
+    /** Should job wait for callback. */
+    private static volatile boolean awaitMasterLeaveCallback = true;
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        awaitMasterLeaveCallback = true;
+        latch = new CountDownLatch(1);
+        jobLatch = new CountDownLatch(GRID_CNT - 1);
+        invokeLatch  = new CountDownLatch(GRID_CNT - 1);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+        discoSpi.setIpFinder(ipFinder);
+
+        cfg.setDiscoverySpi(discoSpi);
+
+        cfg.setCommunicationSpi(new CommunicationSpi());
+        cfg.setMarshaller(new IgniteOptimizedMarshaller(false));
+
+        CacheConfiguration ccfg = defaultCacheConfiguration();
+
+        ccfg.setCacheMode(PARTITIONED);
+
+        cfg.setCacheConfiguration(ccfg);
+
+        return cfg;
+    }
+
+    /**
+     * Get predicate which allows task execution on all nodes except the last 
one.
+     *
+     * @return Predicate.
+     */
+    private IgnitePredicate<ClusterNode> excludeLastPredicate() {
+        return new IgnitePredicate<ClusterNode>() {
+            @Override public boolean apply(ClusterNode e) {
+                return !e.id().equals(grid(GRID_CNT - 1).localNode().id());
+            }
+        };
+    }
+
+    /**
+     * Constructor.
+     */
+    public GridJobMasterLeaveAwareSelfTest() {
+        super(/* don't start grid */ false);
+    }
+
+    /**
+     * Ensure that {@link 
org.apache.ignite.compute.ComputeJobMasterLeaveAware} callback is invoked on 
job which is initiated by
+     * master and is currently running on it.
+     *
+     * @throws Exception If failed.
+     */
+    public void testLocalJobOnMaster() throws Exception {
+        invokeLatch  = new CountDownLatch(1);
+        jobLatch = new CountDownLatch(1);
+
+        Ignite g = startGrid(0);
+
+        g.compute().enableAsync().execute(new TestTask(1), null);
+
+        jobLatch.await();
+
+        // Count down the latch in a separate thread.
+        new Thread(new Runnable() {
+            @Override public void run() {
+                try {
+                    U.sleep(500);
+                }
+                catch (IgniteInterruptedException ignore) {
+                    // No-op.
+                }
+
+                latch.countDown();
+            }
+        }).start();
+
+        stopGrid(0, true);
+
+        latch.countDown();
+
+        assert invokeLatch.await(5000, MILLISECONDS);
+    }
+
+    /**
+     * Ensure that {@link 
org.apache.ignite.compute.ComputeJobMasterLeaveAware} callback is invoked when 
master node leaves topology normally.
+     *
+     * @throws Exception If failed.
+     */
+    public void testMasterStoppedNormally() throws Exception {
+        // Start grids.
+        for (int i = 0; i < GRID_CNT; i++)
+            startGrid(i);
+
+        int lastGridIdx = GRID_CNT - 1;
+
+        
compute(grid(lastGridIdx).forPredicate(excludeLastPredicate())).enableAsync().
+            execute(new TestTask(GRID_CNT - 1), null);
+
+        jobLatch.await();
+
+        stopGrid(lastGridIdx, true);
+
+        latch.countDown();
+
+        assert invokeLatch.await(5000, MILLISECONDS);
+    }
+
+    /**
+     * Ensure that {@link 
org.apache.ignite.compute.ComputeJobMasterLeaveAware} callback is invoked when 
master node leaves topology
+     * abruptly (e.g. due to a network failure or immediate node shutdown).
+     *
+     * @throws Exception If failed.
+     */
+    public void testMasterStoppedAbruptly() throws Exception {
+        // Start grids.
+        for (int i = 0; i < GRID_CNT; i++)
+            startGrid(i);
+
+        int lastGridIdx = GRID_CNT - 1;
+
+        
compute(grid(lastGridIdx).forPredicate(excludeLastPredicate())).enableAsync().
+            execute(new TestTask(GRID_CNT - 1), null);
+
+        jobLatch.await();
+
+        
((CommunicationSpi)grid(lastGridIdx).configuration().getCommunicationSpi()).blockMessages();
+
+        stopGrid(lastGridIdx, true);
+
+        latch.countDown();
+
+        assert invokeLatch.await(5000, MILLISECONDS);
+    }
+
+    /**
+     * Ensure that {@link 
org.apache.ignite.compute.ComputeJobMasterLeaveAware} callback is invoked when 
fails to send
+     * {@link GridJobExecuteResponse} to master node.
+     *
+     * @throws Exception If failed.
+     */
+    public void testCannotSendJobExecuteResponse() throws Exception {
+        awaitMasterLeaveCallback = false;
+
+        // Start grids.
+        for (int i = 0; i < GRID_CNT; i++)
+            startGrid(i);
+
+        int lastGridIdx = GRID_CNT - 1;
+
+        
compute(grid(lastGridIdx).forPredicate(excludeLastPredicate())).enableAsync().
+            execute(new TestTask(GRID_CNT - 1), null);
+
+        jobLatch.await();
+
+        for (int i = 0; i < lastGridIdx; i++)
+            
((CommunicationSpi)grid(i).configuration().getCommunicationSpi()).waitLatch();
+
+        latch.countDown();
+
+        // Ensure that all worker nodes has already started job response 
sending.
+        for (int i = 0; i < lastGridIdx; i++)
+            
((CommunicationSpi)grid(i).configuration().getCommunicationSpi()).awaitResponse();
+
+        // Now we stop master grid.
+        stopGrid(lastGridIdx, true);
+
+        // Release communication SPI wait latches. As master node is stopped, 
job worker will receive and exception.
+        for (int i = 0; i < lastGridIdx; i++)
+            
((CommunicationSpi)grid(i).configuration().getCommunicationSpi()).releaseWaitLatch();
+
+        assert invokeLatch.await(5000, MILLISECONDS);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testApply1() throws Exception {
+        testMasterLeaveAwareCallback(1, new CX1<ClusterGroup, 
IgniteFuture<?>>() {
+            @Override public IgniteFuture<?> applyx(ClusterGroup grid) throws 
IgniteCheckedException {
+                IgniteCompute comp = compute(grid).enableAsync();
+
+                comp.apply(new TestClosure(), "arg");
+
+                return comp.future();
+            }
+        });
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testApply2() throws Exception {
+        testMasterLeaveAwareCallback(2, new CX1<ClusterGroup, 
IgniteFuture<?>>() {
+            @Override public IgniteFuture<?> applyx(ClusterGroup grid) throws 
IgniteCheckedException {
+                IgniteCompute comp = compute(grid).enableAsync();
+
+                comp.apply(new TestClosure(), Arrays.asList("arg1", "arg2"));
+
+                return comp.future();
+            }
+        });
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testApply3() throws Exception {
+        testMasterLeaveAwareCallback(2, new CX1<ClusterGroup, 
IgniteFuture<?>>() {
+            @Override public IgniteFuture<?> applyx(ClusterGroup grid) throws 
IgniteCheckedException {
+                IgniteCompute comp = compute(grid).enableAsync();
+
+                comp.apply(new TestClosure(),
+                    Arrays.asList("arg1", "arg2"),
+                    new IgniteReducer<Void, Object>() {
+                        @Override public boolean collect(@Nullable Void aVoid) 
{
+                            return true;
+                        }
+
+                        @Override public Object reduce() {
+                            return null;
+                        }
+                    });
+
+                return comp.future();
+            }
+        });
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testRun1() throws Exception {
+        testMasterLeaveAwareCallback(1, new CX1<ClusterGroup, 
IgniteFuture<?>>() {
+            @Override public IgniteFuture<?> applyx(ClusterGroup prj) throws 
IgniteCheckedException {
+                IgniteCompute comp = compute(prj).enableAsync();
+
+                comp.run(new TestRunnable());
+
+                return comp.future();
+            }
+        });
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testRun2() throws Exception {
+        testMasterLeaveAwareCallback(2, new CX1<ClusterGroup, 
IgniteFuture<?>>() {
+            @Override public IgniteFuture<?> applyx(ClusterGroup prj) throws 
IgniteCheckedException {
+                IgniteCompute comp = compute(prj).enableAsync();
+
+                comp.run(Arrays.asList(new TestRunnable(), new 
TestRunnable()));
+
+                return comp.future();
+            }
+        });
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCall1() throws Exception {
+        testMasterLeaveAwareCallback(1, new CX1<ClusterGroup, 
IgniteFuture<?>>() {
+            @Override public IgniteFuture<?> applyx(ClusterGroup prj) throws 
IgniteCheckedException {
+                IgniteCompute comp = compute(prj).enableAsync();
+
+                comp.call(new TestCallable());
+
+                return comp.future();
+            }
+        });
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCall2() throws Exception {
+        testMasterLeaveAwareCallback(2, new CX1<ClusterGroup, 
IgniteFuture<?>>() {
+            @Override public IgniteFuture<?> applyx(ClusterGroup prj) throws 
IgniteCheckedException {
+                IgniteCompute comp = compute(prj).enableAsync();
+
+                comp.call(Arrays.asList(new TestCallable(), new 
TestCallable()));
+
+                return comp.future();
+            }
+        });
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCall3() throws Exception {
+        testMasterLeaveAwareCallback(2, new CX1<ClusterGroup, 
IgniteFuture<?>>() {
+            @Override public IgniteFuture<?> applyx(ClusterGroup prj) throws 
IgniteCheckedException {
+                IgniteCompute comp = compute(prj).enableAsync();
+
+                comp.call(
+                    Arrays.asList(new TestCallable(), new TestCallable()),
+                    new IgniteReducer<Void, Object>() {
+                        @Override public boolean collect(@Nullable Void aVoid) 
{
+                            return true;
+                        }
+
+                        @Override public Object reduce() {
+                            return null;
+                        }
+                    });
+
+                return comp.future();
+            }
+        });
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testBroadcast1() throws Exception {
+        testMasterLeaveAwareCallback(1, new CX1<ClusterGroup, 
IgniteFuture<?>>() {
+            @Override public IgniteFuture<?> applyx(ClusterGroup prj) throws 
IgniteCheckedException {
+                IgniteCompute comp = compute(prj).enableAsync();
+
+                comp.broadcast(new TestRunnable());
+
+                return comp.future();
+            }
+        });
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testBroadcast2() throws Exception {
+        testMasterLeaveAwareCallback(1, new CX1<ClusterGroup, 
IgniteFuture<?>>() {
+            @Override public IgniteFuture<?> applyx(ClusterGroup prj) throws 
IgniteCheckedException {
+                IgniteCompute comp = compute(prj).enableAsync();
+
+                comp.broadcast(new TestCallable());
+
+                return comp.future();
+            }
+        });
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testBroadcast3() throws Exception {
+        testMasterLeaveAwareCallback(1, new CX1<ClusterGroup, 
IgniteFuture<?>>() {
+            @Override public IgniteFuture<?> applyx(ClusterGroup prj) throws 
IgniteCheckedException {
+                IgniteCompute comp = compute(prj).enableAsync();
+
+                comp.broadcast(new TestClosure(), "arg");
+
+                return comp.future();
+            }
+        });
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAffinityRun() throws Exception {
+        testMasterLeaveAwareCallback(1, new CX1<ClusterGroup, 
IgniteFuture<?>>() {
+            @Override public IgniteFuture<?> applyx(ClusterGroup prj) throws 
IgniteCheckedException {
+                IgniteCompute comp = compute(prj).enableAsync();
+
+                GridCacheAffinity<Object> aff = 
prj.ignite().cache(null).affinity();
+
+                ClusterNode node = F.first(prj.nodes());
+
+                comp.affinityRun(null, keyForNode(aff, node), new 
TestRunnable());
+
+                return comp.future();
+            }
+        });
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAffinityCall() throws Exception {
+        testMasterLeaveAwareCallback(1, new CX1<ClusterGroup, 
IgniteFuture<?>>() {
+            @Override public IgniteFuture<?> applyx(ClusterGroup prj) throws 
IgniteCheckedException {
+                IgniteCompute comp = compute(prj).enableAsync();
+
+                GridCacheAffinity<Object> aff = 
prj.ignite().cache(null).affinity();
+
+                ClusterNode node = F.first(prj.nodes());
+
+                comp.affinityCall(null, keyForNode(aff, node), new 
TestCallable());
+
+                return comp.future();
+            }
+        });
+    }
+
+    /**
+     * @param aff Cache affinity.
+     * @param node Node.
+     * @return Finds some cache key for which given node is primary.
+     */
+    private Object keyForNode(GridCacheAffinity<Object> aff, ClusterNode node) 
{
+        assertNotNull(node);
+
+        Object key = null;
+
+        for (int i = 0; i < 1000; i++) {
+            if (aff.isPrimary(node, i)) {
+                key = i;
+
+                break;
+            }
+        }
+
+        assertNotNull(key);
+
+        return key;
+    }
+
+    /**
+     * @param expJobs Expected jobs number.
+     * @param taskStarter Task started.
+     * @throws Exception If failed.
+     */
+    private void testMasterLeaveAwareCallback(int expJobs, 
IgniteClosure<ClusterGroup, IgniteFuture<?>> taskStarter)
+        throws Exception {
+        jobLatch = new CountDownLatch(expJobs);
+        invokeLatch  = new CountDownLatch(expJobs);
+
+        for (int i = 0; i < GRID_CNT; i++)
+            startGrid(i);
+
+        int lastGridIdx = GRID_CNT - 1;
+
+        IgniteFuture<?> fut = 
taskStarter.apply(grid(lastGridIdx).forPredicate(excludeLastPredicate()));
+
+        jobLatch.await();
+
+        stopGrid(lastGridIdx, true);
+
+        latch.countDown();
+
+        assert invokeLatch.await(5000, MILLISECONDS);
+
+        try {
+            fut.get();
+        }
+        catch (IgniteCheckedException e) {
+            log.debug("Task failed: " + e);
+        }
+    }
+
+    /**
+     */
+    private static class TestMasterLeaveAware {
+        /** */
+        private final CountDownLatch latch0 = new CountDownLatch(1);
+
+        /**
+         * @param log Logger.
+         */
+        private void execute(IgniteLogger log) {
+            try {
+                log.info("Started execute.");
+
+                // Countdown shared job latch so that the main thread know 
that all jobs are
+                // inside the "execute" routine.
+                jobLatch.countDown();
+
+                log.info("After job latch.");
+
+                // Await for the main thread to allow jobs to proceed.
+                latch.await();
+
+                log.info("After latch.");
+
+                if (awaitMasterLeaveCallback) {
+                    latch0.await();
+
+                    log.info("After latch0.");
+                }
+                else
+                    log.info("Latch 0 skipped.");
+            }
+            catch (InterruptedException e) {
+                // We do not expect any interruptions here, hence this 
statement.
+                fail("Unexpected exception: " + e);
+            }
+        }
+
+        /**
+         * @param log Logger.
+         * @param job Actual job.
+         */
+        private void onMasterLeave(IgniteLogger log, Object job) {
+            log.info("Callback executed: " + job);
+
+            latch0.countDown();
+
+            invokeLatch.countDown();
+        }
+    }
+
+    /**
+     * Master leave aware callable.
+     */
+    private static class TestCallable implements Callable<Void>, 
ComputeJobMasterLeaveAware {
+        /** Task session. */
+        @IgniteLoggerResource
+        private IgniteLogger log;
+
+        /** */
+        private TestMasterLeaveAware masterLeaveAware = new 
TestMasterLeaveAware();
+
+        /** {@inheritDoc} */
+        @Override public Void call() throws Exception {
+            masterLeaveAware.execute(log);
+
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onMasterNodeLeft(ComputeTaskSession ses) throws 
IgniteCheckedException {
+            masterLeaveAware.onMasterLeave(log, this);
+        }
+    }
+
+    /**
+     * Master leave aware runnable.
+     */
+    private static class TestRunnable implements Runnable, 
ComputeJobMasterLeaveAware {
+        /** Task session. */
+        @IgniteLoggerResource
+        private IgniteLogger log;
+
+        /** */
+        private TestMasterLeaveAware masterLeaveAware = new 
TestMasterLeaveAware();
+
+        /** {@inheritDoc} */
+        @Override public void run() {
+            masterLeaveAware.execute(log);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onMasterNodeLeft(ComputeTaskSession ses) throws 
IgniteCheckedException {
+            masterLeaveAware.onMasterLeave(log, this);
+        }
+    }
+
+    /**
+     * Master leave aware closure.
+     */
+    private static class TestClosure implements IgniteClosure<String, Void>, 
ComputeJobMasterLeaveAware {
+        /** Task session. */
+        @IgniteLoggerResource
+        private IgniteLogger log;
+
+        /** */
+        private TestMasterLeaveAware masterLeaveAware = new 
TestMasterLeaveAware();
+
+        /** {@inheritDoc} */
+        @Override public Void apply(String arg) {
+            masterLeaveAware.execute(log);
+
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onMasterNodeLeft(ComputeTaskSession ses) throws 
IgniteCheckedException {
+            masterLeaveAware.onMasterLeave(log, this);
+        }
+    }
+
+    /**
+     * Base implementation of dummy task which produces predefined amount of 
test jobs on split stage.
+     */
+    private static class TestTask extends ComputeTaskSplitAdapter<String, 
Integer> {
+        /** How many jobs to produce. */
+        private int jobCnt;
+
+        /** */
+        @IgniteTaskSessionResource
+        private ComputeTaskSession taskSes;
+
+        /**
+         * Constructor.
+         *
+         * @param jobCnt How many jobs to produce on split stage.
+         */
+        private TestTask(int jobCnt) {
+            this.jobCnt = jobCnt;
+        }
+
+        /** {@inheritDoc} */
+        @Override protected Collection<? extends ComputeJob> split(int 
gridSize, String arg) throws IgniteCheckedException {
+            Collection<ComputeJobAdapter> jobs = new ArrayList<>(jobCnt);
+
+            for (int i = 0; i < jobCnt; i++)
+                jobs.add(new TestJob());
+
+            return jobs;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Integer reduce(List<ComputeJobResult> results) throws 
IgniteCheckedException {
+            return null;
+        }
+    }
+
+    /**
+     * Base implementation of dummy test job.
+     */
+    private static class TestJob extends ComputeJobAdapter implements 
ComputeJobMasterLeaveAware {
+        /** Task session. */
+        @IgniteLoggerResource
+        private IgniteLogger log;
+
+        /** */
+        private TestMasterLeaveAware masterLeaveAware = new 
TestMasterLeaveAware();
+
+        /**
+         * Constructor
+         */
+        private TestJob() {
+            super(new Object());
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object execute() throws IgniteCheckedException {
+            masterLeaveAware.execute(log);
+
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onMasterNodeLeft(ComputeTaskSession ses) throws 
IgniteCheckedException {
+            masterLeaveAware.onMasterLeave(log, this);
+        }
+    }
+
+    /**
+     * Communication SPI which could optionally block outgoing messages.
+     */
+    private static class CommunicationSpi extends TcpCommunicationSpi {
+        /** Whether to block all outgoing messages. */
+        private volatile boolean block;
+
+        /** Job execution response latch. */
+        private CountDownLatch respLatch = new CountDownLatch(1);
+
+        /** Whether to wait for a wait latch before returning. */
+        private volatile boolean wait;
+
+        /** Wait latch. */
+        private CountDownLatch waitLatch = new CountDownLatch(1);
+
+        /** {@inheritDoc} */
+        @Override public void sendMessage(ClusterNode node, 
GridTcpCommunicationMessageAdapter msg)
+            throws IgniteSpiException {
+            sendMessage0(node, msg);
+        }
+
+        /**
+         * Send message optionally either blocking it or throwing an exception 
if it is of
+         * {@link GridJobExecuteResponse} type.
+         *
+         * @param node Destination node.
+         * @param msg Message to be sent.
+         * @throws org.apache.ignite.spi.IgniteSpiException If failed.
+         */
+        private void sendMessage0(ClusterNode node, 
GridTcpCommunicationMessageAdapter msg) throws IgniteSpiException {
+            if (msg instanceof GridIoMessage) {
+                GridIoMessage msg0 = (GridIoMessage)msg;
+
+                if (msg0.message() instanceof GridJobExecuteResponse) {
+                    respLatch.countDown();
+
+                    if (wait) {
+                        try {
+                            U.await(waitLatch);
+                        }
+                        catch (IgniteInterruptedException ignore) {
+                            // No-op.
+                        }
+                    }
+                }
+            }
+
+            if (!block)
+                super.sendMessage(node, msg);
+        }
+
+        /**
+         * Block all outgoing message.
+         */
+        void blockMessages() {
+            block = true;
+        }
+
+        /**
+         * Whether to block on a wait latch.
+         */
+        private void waitLatch() {
+            wait = true;
+        }
+
+        /**
+         * Count down wait latch.
+         */
+        private void releaseWaitLatch() {
+            waitLatch.countDown();
+        }
+
+        /**
+         * Await for job execution response to come.
+         *
+         * @throws org.apache.ignite.IgniteInterruptedException If interrupted.
+         */
+        private void awaitResponse() throws IgniteInterruptedException {
+            U.await(respLatch);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/537f631a/modules/core/src/test/java/org/apache/ignite/internal/GridJobStealingSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/GridJobStealingSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/GridJobStealingSelfTest.java
new file mode 100644
index 0000000..d918ccb
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/GridJobStealingSelfTest.java
@@ -0,0 +1,439 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.compute.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.marshaller.optimized.*;
+import org.apache.ignite.resources.*;
+import org.apache.ignite.spi.collision.jobstealing.*;
+import org.apache.ignite.spi.failover.jobstealing.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.testframework.config.*;
+import org.apache.ignite.testframework.junits.common.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.net.*;
+import java.util.*;
+
+/**
+ * Job stealing test.
+ */
+@SuppressWarnings("unchecked")
+@GridCommonTest(group = "Kernal Self")
+public class GridJobStealingSelfTest extends GridCommonAbstractTest {
+    /** Task execution timeout in milliseconds. */
+    private static final int TASK_EXEC_TIMEOUT_MS = 50000;
+
+    /** */
+    private Ignite ignite1;
+
+    /** */
+    private Ignite ignite2;
+
+    /** Job distribution map. Records which job has run on which node. */
+    private static Map<UUID, Collection<ComputeJob>> jobDistrMap = new 
HashMap<>();
+
+    /** */
+    public GridJobStealingSelfTest() {
+        super(false /* don't start grid*/);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        jobDistrMap.clear();
+
+        ignite1 = startGrid(1);
+
+        ignite2 = startGrid(2);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+
+        ignite1 = null;
+        ignite2 = null;
+    }
+
+    /**
+     * Test 2 jobs on 1 node.
+     *
+     * @throws IgniteCheckedException If test failed.
+     */
+    public void testTwoJobs() throws IgniteCheckedException {
+        executeAsync(ignite1.compute(), new JobStealingSingleNodeTask(2), 
null).get(TASK_EXEC_TIMEOUT_MS);
+
+        // Verify that 1 job was stolen by second node.
+        assertEquals(2, jobDistrMap.keySet().size());
+        assertEquals(1, 
jobDistrMap.get(ignite1.cluster().localNode().id()).size());
+        assertEquals(1, 
jobDistrMap.get(ignite2.cluster().localNode().id()).size());
+    }
+
+    /**
+     * Test 2 jobs on 1 node with null predicate.
+     *
+     * @throws IgniteCheckedException If test failed.
+     */
+    @SuppressWarnings("NullArgumentToVariableArgMethod")
+    public void testTwoJobsNullPredicate() throws IgniteCheckedException {
+        executeAsync(ignite1.compute(), new JobStealingSingleNodeTask(2), 
null).get(TASK_EXEC_TIMEOUT_MS);
+
+        // Verify that 1 job was stolen by second node.
+        assertEquals(2, jobDistrMap.keySet().size());
+        assertEquals(1, 
jobDistrMap.get(ignite1.cluster().localNode().id()).size());
+        assertEquals(1, 
jobDistrMap.get(ignite2.cluster().localNode().id()).size());
+    }
+
+    /**
+     * Test 2 jobs on 1 node with null predicate using string task name.
+     *
+     * @throws IgniteCheckedException If test failed.
+     */
+    @SuppressWarnings("NullArgumentToVariableArgMethod")
+    public void testTwoJobsTaskNameNullPredicate() throws 
IgniteCheckedException {
+        executeAsync(ignite1.compute(), 
JobStealingSingleNodeTask.class.getName(), null).get(TASK_EXEC_TIMEOUT_MS);
+
+        // Verify that 1 job was stolen by second node.
+        assertEquals(2, jobDistrMap.keySet().size());
+        assertEquals(1, 
jobDistrMap.get(ignite1.cluster().localNode().id()).size());
+        assertEquals(1, 
jobDistrMap.get(ignite2.cluster().localNode().id()).size());
+    }
+
+    /**
+     * Test 2 jobs on 1 node when one of the predicates is null.
+     *
+     * @throws IgniteCheckedException If test failed.
+     */
+    @SuppressWarnings("unchecked")
+    public void testTwoJobsPartiallyNullPredicate() throws 
IgniteCheckedException {
+        IgnitePredicate<ClusterNode> topPred =  new 
IgnitePredicate<ClusterNode>() {
+                @Override public boolean apply(ClusterNode e) {
+                    return ignite2.cluster().localNode().id().equals(e.id()); 
// Limit projection with only grid2.
+                }
+            };
+
+        
executeAsync(compute(ignite1.cluster().forPredicate(topPred)).withTimeout(TASK_EXEC_TIMEOUT_MS),
+            new JobStealingSpreadTask(2), null).get(TASK_EXEC_TIMEOUT_MS);
+
+        assertEquals(1, jobDistrMap.keySet().size());
+        assertEquals(2, 
jobDistrMap.get(ignite2.cluster().localNode().id()).size());
+        
assertFalse(jobDistrMap.containsKey(ignite1.cluster().localNode().id()));
+    }
+
+    /**
+     * Tests that projection predicate is taken into account by Stealing SPI.
+     *
+     * @throws Exception If failed.
+     */
+    public void testProjectionPredicate() throws Exception {
+        final Ignite ignite3 = startGrid(3);
+
+        executeAsync(compute(ignite1.cluster().forPredicate(new 
P1<ClusterNode>() {
+            @Override public boolean apply(ClusterNode e) {
+                return ignite1.cluster().localNode().id().equals(e.id()) ||
+                    ignite3.cluster().localNode().id().equals(e.id()); // 
Limit projection with only grid1 or grid3 node.
+            }
+        })), new JobStealingSpreadTask(4), null).get(TASK_EXEC_TIMEOUT_MS);
+
+        // Verify that jobs were run only on grid1 and grid3 (not on grid2)
+        assertEquals(2, jobDistrMap.keySet().size());
+        assertEquals(2, 
jobDistrMap.get(ignite1.cluster().localNode().id()).size());
+        assertEquals(2, 
jobDistrMap.get(ignite3.cluster().localNode().id()).size());
+        
assertFalse(jobDistrMap.containsKey(ignite2.cluster().localNode().id()));
+    }
+
+    /**
+     * Tests that projection predicate is taken into account by Stealing SPI,
+     * and that jobs in projection can steal tasks from each other.
+     *
+     * @throws Exception If failed.
+     */
+    public void testProjectionPredicateInternalStealing() throws Exception {
+        final Ignite ignite3 = startGrid(3);
+
+        IgnitePredicate<ClusterNode> p = new P1<ClusterNode>() {
+            @Override public boolean apply(ClusterNode e) {
+                return ignite1.cluster().localNode().id().equals(e.id()) ||
+                    ignite3.cluster().localNode().id().equals(e.id()); // 
Limit projection with only grid1 or grid3 node.
+            }
+        };
+
+        executeAsync(compute(ignite1.cluster().forPredicate(p)), new 
JobStealingSingleNodeTask(4), null).get(TASK_EXEC_TIMEOUT_MS);
+
+        // Verify that jobs were run only on grid1 and grid3 (not on grid2)
+        assertEquals(2, jobDistrMap.keySet().size());
+        
assertFalse(jobDistrMap.containsKey(ignite2.cluster().localNode().id()));
+    }
+
+    /**
+     * Tests that a job is not cancelled if there are no
+     * available thief nodes in topology.
+     *
+     * @throws Exception If failed.
+     */
+    public void testSingleNodeTopology() throws Exception {
+        IgnitePredicate<ClusterNode> p = new IgnitePredicate<ClusterNode>() {
+            @Override public boolean apply(ClusterNode e) {
+                return ignite1.cluster().localNode().id().equals(e.id()); // 
Limit projection with only grid1 node.
+            }
+        };
+
+        executeAsync(compute(ignite1.cluster().forPredicate(p)), new 
JobStealingSpreadTask(2), null).
+            get(TASK_EXEC_TIMEOUT_MS);
+
+        assertEquals(1, jobDistrMap.keySet().size());
+        assertEquals(2, 
jobDistrMap.get(ignite1.cluster().localNode().id()).size());
+    }
+
+    /**
+     * Tests that a job is not cancelled if there are no
+     * available thief nodes in projection.
+     *
+     * @throws Exception If failed.
+     */
+    public void testSingleNodeProjection() throws Exception {
+        ClusterGroup prj = 
ignite1.cluster().forNodeIds(Collections.singleton(ignite1.cluster().localNode().id()));
+
+        executeAsync(compute(prj), new JobStealingSpreadTask(2), 
null).get(TASK_EXEC_TIMEOUT_MS);
+
+        assertEquals(1, jobDistrMap.keySet().size());
+        assertEquals(2, 
jobDistrMap.get(ignite1.cluster().localNode().id()).size());
+    }
+
+    /**
+     * Tests that a job is not cancelled if there are no
+     * available thief nodes in projection. Uses null predicate.
+     *
+     * @throws Exception If failed.
+     */
+    @SuppressWarnings("NullArgumentToVariableArgMethod")
+    public void testSingleNodeProjectionNullPredicate() throws Exception {
+        ClusterGroup prj = 
ignite1.cluster().forNodeIds(Collections.singleton(ignite1.cluster().localNode().id()));
+
+        executeAsync(compute(prj).withTimeout(TASK_EXEC_TIMEOUT_MS), new 
JobStealingSpreadTask(2), null).
+            get(TASK_EXEC_TIMEOUT_MS);
+
+        assertEquals(1, jobDistrMap.keySet().size());
+        assertEquals(2, 
jobDistrMap.get(ignite1.cluster().localNode().id()).size());
+    }
+
+    /**
+     * Tests job stealing with peer deployment and different class loaders.
+     *
+     * @throws Exception If failed.
+     */
+    @SuppressWarnings("unchecked")
+    public void testProjectionPredicateDifferentClassLoaders() throws 
Exception {
+        final Ignite ignite3 = startGrid(3);
+
+        URL[] clsLdrUrls;
+        try {
+            clsLdrUrls = new URL[] {new 
URL(GridTestProperties.getProperty("p2p.uri.cls"))};
+        }
+        catch (MalformedURLException e) {
+            throw new RuntimeException("Define property p2p.uri.cls", e);
+        }
+
+        ClassLoader ldr1 = new URLClassLoader(clsLdrUrls, 
getClass().getClassLoader());
+
+        Class taskCls = 
ldr1.loadClass("org.gridgain.grid.tests.p2p.JobStealingTask");
+        Class nodeFilterCls = 
ldr1.loadClass("org.gridgain.grid.tests.p2p.GridExcludeNodeFilter");
+
+        IgnitePredicate<ClusterNode> nodeFilter = 
(IgnitePredicate<ClusterNode>)nodeFilterCls
+            
.getConstructor(UUID.class).newInstance(ignite2.cluster().localNode().id());
+
+        Map<UUID, Integer> ret = (Map<UUID, 
Integer>)executeAsync(compute(ignite1.cluster().forPredicate(nodeFilter)),
+            taskCls, null).get(TASK_EXEC_TIMEOUT_MS);
+
+        assert ret != null;
+        assert ret.get(ignite1.cluster().localNode().id()) != null && 
ret.get(ignite1.cluster().localNode().id()) == 2 :
+            ret.get(ignite1.cluster().localNode().id());
+        assert ret.get(ignite3.cluster().localNode().id()) != null && 
ret.get(ignite3.cluster().localNode().id()) == 2 :
+            ret.get(ignite3.cluster().localNode().id());
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        JobStealingCollisionSpi colSpi = new JobStealingCollisionSpi();
+
+        // One job at a time.
+        colSpi.setActiveJobsThreshold(1);
+        colSpi.setWaitJobsThreshold(0);
+
+        JobStealingFailoverSpi failSpi = new JobStealingFailoverSpi();
+
+        // Verify defaults.
+        assert failSpi.getMaximumFailoverAttempts() == 
JobStealingFailoverSpi.DFLT_MAX_FAILOVER_ATTEMPTS;
+
+        cfg.setCollisionSpi(colSpi);
+        cfg.setFailoverSpi(failSpi);
+
+        cfg.setMarshaller(new IgniteOptimizedMarshaller(false));
+
+        return cfg;
+    }
+
+    /**
+     * Job stealing task, that spreads jobs equally over the grid.
+     */
+    private static class JobStealingSpreadTask extends 
ComputeTaskAdapter<Object, Object> {
+        /** Grid. */
+        @IgniteInstanceResource
+        private Ignite ignite;
+
+        /** Logger. */
+        @IgniteLoggerResource
+        private IgniteLogger log;
+
+        /** Number of jobs to spawn from task. */
+        protected final int nJobs;
+
+        /**
+         * Constructs a new task instance.
+         *
+         * @param nJobs Number of jobs to spawn from this task.
+         */
+        JobStealingSpreadTask(int nJobs) {
+            this.nJobs = nJobs;
+        }
+
+        /** {@inheritDoc} */
+        @SuppressWarnings("ForLoopReplaceableByForEach")
+        @Override public Map<? extends ComputeJob, ClusterNode> 
map(List<ClusterNode> subgrid,
+            @Nullable Object arg) throws IgniteCheckedException {
+            //assert subgrid.size() == 2 : "Invalid subgrid size: " + 
subgrid.size();
+
+            Map<ComputeJobAdapter, ClusterNode> map = new 
HashMap<>(subgrid.size());
+
+            Iterator<ClusterNode> subIter = subgrid.iterator();
+
+            // Spread jobs over subgrid.
+            for (int i = 0; i < nJobs; i++) {
+                if (!subIter.hasNext())
+                    subIter = subgrid.iterator(); // wrap around
+
+                map.put(new GridJobStealingJob(5000L), subIter.next());
+            }
+
+            return map;
+        }
+
+        /** {@inheritDoc} */
+        @SuppressWarnings("SuspiciousMethodCalls")
+        @Override public Object reduce(List<ComputeJobResult> results) throws 
IgniteCheckedException {
+            for (ComputeJobResult res : results) {
+                log.info("Job result: " + res.getData());
+            }
+
+            return null;
+        }
+    }
+
+    /**
+     * Job stealing task, that puts all jobs onto one node.
+     */
+    private static class JobStealingSingleNodeTask extends 
JobStealingSpreadTask {
+        /** {@inheritDoc} */
+        JobStealingSingleNodeTask(int nJobs) {
+            super(nJobs);
+        }
+
+        /**
+         * Default constructor.
+         *
+         * Uses 2 jobs.
+         */
+        JobStealingSingleNodeTask() {
+            super(2);
+        }
+
+        /** {@inheritDoc} */
+        @SuppressWarnings("ForLoopReplaceableByForEach")
+        @Override public Map<? extends ComputeJob, ClusterNode> 
map(List<ClusterNode> subgrid,
+            @Nullable Object arg) throws IgniteCheckedException {
+            assert subgrid.size() > 1 : "Invalid subgrid size: " + 
subgrid.size();
+
+            Map<ComputeJobAdapter, ClusterNode> map = new 
HashMap<>(subgrid.size());
+
+            // Put all jobs onto one node.
+            for (int i = 0; i < nJobs; i++)
+                map.put(new GridJobStealingJob(5000L), subgrid.get(0));
+
+            return map;
+        }
+    }
+
+    /**
+     * Job stealing job.
+     */
+    private static final class GridJobStealingJob extends ComputeJobAdapter {
+        /** Injected grid. */
+        @IgniteInstanceResource
+        private Ignite ignite;
+
+        /** Logger. */
+        @IgniteLoggerResource
+        private IgniteLogger log;
+
+        /**
+         * @param arg Job argument.
+         */
+        GridJobStealingJob(Long arg) {
+            super(arg);
+        }
+
+        /** {@inheritDoc} */
+        @Override public Serializable execute() throws IgniteCheckedException {
+            log.info("Started job on node: " + 
ignite.cluster().localNode().id());
+
+            if (!jobDistrMap.containsKey(ignite.cluster().localNode().id())) {
+                Collection<ComputeJob> jobs = new ArrayList<>();
+                jobs.add(this);
+
+                jobDistrMap.put(ignite.cluster().localNode().id(), jobs);
+            }
+            else
+                jobDistrMap.get(ignite.cluster().localNode().id()).add(this);
+
+            try {
+                Long sleep = argument(0);
+
+                assert sleep != null;
+
+                Thread.sleep(sleep);
+            }
+            catch (InterruptedException e) {
+                log.info("Job got interrupted on node: " + 
ignite.cluster().localNode().id());
+
+                throw new IgniteCheckedException("Job got interrupted.", e);
+            }
+            finally {
+                log.info("Job finished on node: " + 
ignite.cluster().localNode().id());
+            }
+
+            return ignite.cluster().localNode().id();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/537f631a/modules/core/src/test/java/org/apache/ignite/internal/GridJobStealingZeroActiveJobsSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/GridJobStealingZeroActiveJobsSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/GridJobStealingZeroActiveJobsSelfTest.java
new file mode 100644
index 0000000..de0d669
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/GridJobStealingZeroActiveJobsSelfTest.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.compute.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.resources.*;
+import org.apache.ignite.spi.collision.jobstealing.*;
+import org.apache.ignite.spi.failover.jobstealing.*;
+import org.apache.ignite.testframework.junits.common.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * Job stealing test.
+ */
+@GridCommonTest(group = "Kernal Self")
+public class GridJobStealingZeroActiveJobsSelfTest extends 
GridCommonAbstractTest {
+    /** */
+    private static Ignite ignite1;
+
+    /** */
+    private static Ignite ignite2;
+
+    /** */
+    public GridJobStealingZeroActiveJobsSelfTest() {
+        super(false /* don't start grid*/);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        ignite1 = startGrid(1);
+        ignite2 = startGrid(2);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        ignite1 = null;
+
+        stopGrid(1);
+        stopGrid(2);
+    }
+
+    /**
+     * Test 2 jobs on 2 nodes.
+     *
+     * @throws IgniteCheckedException If test failed.
+     */
+    public void testTwoJobs() throws IgniteCheckedException {
+        ignite1.compute().execute(JobStealingTask.class, null);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        JobStealingCollisionSpi colSpi = new JobStealingCollisionSpi();
+
+        // One job at a time.
+        colSpi.setActiveJobsThreshold(gridName.endsWith("1") ? 0 : 2);
+        colSpi.setWaitJobsThreshold(0);
+
+        JobStealingFailoverSpi failSpi = new JobStealingFailoverSpi();
+
+        // Verify defaults.
+        assert failSpi.getMaximumFailoverAttempts() == 
JobStealingFailoverSpi.DFLT_MAX_FAILOVER_ATTEMPTS;
+
+        cfg.setCollisionSpi(colSpi);
+        cfg.setFailoverSpi(failSpi);
+
+        return cfg;
+    }
+
+    /** */
+    @SuppressWarnings({"PublicInnerClass"})
+    public static class JobStealingTask extends ComputeTaskAdapter<Object, 
Object> {
+        /** Grid. */
+        @IgniteInstanceResource
+        private Ignite ignite;
+
+        /** Logger. */
+        @IgniteLoggerResource
+        private IgniteLogger log;
+
+        /** {@inheritDoc} */
+        @Override public Map<? extends ComputeJob, ClusterNode> 
map(List<ClusterNode> subgrid, @Nullable Object arg) throws 
IgniteCheckedException {
+            assert subgrid.size() == 2 : "Invalid subgrid size: " + 
subgrid.size();
+
+            Map<ComputeJobAdapter, ClusterNode> map = new 
HashMap<>(subgrid.size());
+
+            // Put all jobs onto local node.
+            for (Iterator iter = subgrid.iterator(); iter.hasNext(); 
iter.next())
+                map.put(new GridJobStealingJob(5000L), 
ignite.cluster().localNode());
+
+            return map;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object reduce(List<ComputeJobResult> results) throws 
IgniteCheckedException {
+            assert results.size() == 2;
+
+            for (ComputeJobResult res : results) {
+                log.info("Job result: " + res.getData());
+            }
+
+            String name1 = results.get(0).getData();
+            String name2 = results.get(1).getData();
+
+            assert name1.equals(name2);
+
+            assert !name1.equals(ignite1.name());
+            assert name1.equals(ignite2.name());
+
+            return null;
+        }
+    }
+
+    /**
+     *
+     */
+    @SuppressWarnings({"PublicInnerClass"})
+    public static final class GridJobStealingJob extends ComputeJobAdapter {
+        /** Injected grid. */
+        @IgniteInstanceResource
+        private Ignite ignite;
+
+        /**
+         * @param arg Job argument.
+         */
+        GridJobStealingJob(Long arg) {
+            super(arg);
+        }
+
+        /** {@inheritDoc} */
+        @Override public Serializable execute() throws IgniteCheckedException {
+            try {
+                Long sleep = argument(0);
+
+                assert sleep != null;
+
+                Thread.sleep(sleep);
+            }
+            catch (InterruptedException e) {
+                throw new IgniteCheckedException("Job got interrupted.", e);
+            }
+
+            return ignite.name();
+        }
+    }
+}

Reply via email to