http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f49d0b7f/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java
----------------------------------------------------------------------
diff --cc 
modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java
index 0000000,76988a3..a111e53
mode 000000,100644..100644
--- 
a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java
+++ 
b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java
@@@ -1,0 -1,1006 +1,1011 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *      http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.ignite.internal.processors.hadoop;
+ 
+ import org.apache.ignite.*;
+ import org.apache.ignite.cache.*;
+ import org.apache.ignite.cluster.*;
+ import org.apache.ignite.configuration.*;
+ import org.apache.ignite.hadoop.mapreduce.*;
+ import org.apache.ignite.igfs.*;
+ import org.apache.ignite.igfs.mapreduce.*;
+ import org.apache.ignite.igfs.secondary.*;
+ import org.apache.ignite.internal.*;
+ import org.apache.ignite.internal.cluster.*;
+ import org.apache.ignite.internal.processors.cache.*;
+ import org.apache.ignite.internal.processors.igfs.*;
+ import org.apache.ignite.internal.util.typedef.*;
+ import org.apache.ignite.lang.*;
+ import org.apache.ignite.testframework.*;
+ import org.jetbrains.annotations.*;
+ 
+ import java.net.*;
+ import java.util.*;
+ 
+ /**
+  *
+  */
+ public class HadoopDefaultMapReducePlannerSelfTest extends 
HadoopAbstractSelfTest {
+     /** */
+     private static final UUID ID_1 = new UUID(0, 1);
+ 
+     /** */
+     private static final UUID ID_2 = new UUID(0, 2);
+ 
+     /** */
+     private static final UUID ID_3 = new UUID(0, 3);
+ 
+     /** */
+     private static final String HOST_1 = "host1";
+ 
+     /** */
+     private static final String HOST_2 = "host2";
+ 
+     /** */
+     private static final String HOST_3 = "host3";
+ 
+     /** */
+     private static final String INVALID_HOST_1 = "invalid_host1";
+ 
+     /** */
+     private static final String INVALID_HOST_2 = "invalid_host2";
+ 
+     /** */
+     private static final String INVALID_HOST_3 = "invalid_host3";
+ 
+     /** Mocked Grid. */
+     private static final MockIgnite GRID = new MockIgnite();
+ 
+     /** Mocked IGFS. */
+     private static final IgniteFileSystem IGFS = new MockIgfs();
+ 
+     /** Planner. */
+     private static final HadoopMapReducePlanner PLANNER = new 
IgniteHadoopMapReducePlanner();
+ 
+     /** Block locations. */
+     private static final Map<Block, Collection<IgfsBlockLocation>> BLOCK_MAP 
= new HashMap<>();
+ 
+     /** Proxy map. */
+     private static final Map<URI, Boolean> PROXY_MAP = new HashMap<>();
+ 
+     /** Last created plan. */
+     private static final ThreadLocal<HadoopMapReducePlan> PLAN = new 
ThreadLocal<>();
+ 
+     /**
+      *
+      */
+     static {
+         GridTestUtils.setFieldValue(PLANNER, "ignite", GRID);
+     }
+ 
+     /** {@inheritDoc} */
+     @Override protected void beforeTest() throws Exception {
+         GridTestUtils.setFieldValue(PLANNER, "log", log());
+ 
+         BLOCK_MAP.clear();
+         PROXY_MAP.clear();
+     }
+ 
+     /**
+      * @throws IgniteCheckedException If failed.
+      */
+     public void testIgfsOneBlockPerNode() throws IgniteCheckedException {
+         HadoopFileBlock split1 = split(true, "/file1", 0, 100, HOST_1);
+         HadoopFileBlock split2 = split(true, "/file2", 0, 100, HOST_2);
+         HadoopFileBlock split3 = split(true, "/file3", 0, 100, HOST_3);
+ 
+         mapIgfsBlock(split1.file(), 0, 100, location(0, 100, ID_1));
+         mapIgfsBlock(split2.file(), 0, 100, location(0, 100, ID_2));
+         mapIgfsBlock(split3.file(), 0, 100, location(0, 100, ID_3));
+ 
+         plan(1, split1);
+         assert ensureMappers(ID_1, split1);
+         assert ensureReducers(ID_1, 1);
+         assert ensureEmpty(ID_2);
+         assert ensureEmpty(ID_3);
+ 
+         plan(2, split1);
+         assert ensureMappers(ID_1, split1);
+         assert ensureReducers(ID_1, 2);
+         assert ensureEmpty(ID_2);
+         assert ensureEmpty(ID_3);
+ 
+         plan(1, split1, split2);
+         assert ensureMappers(ID_1, split1);
+         assert ensureMappers(ID_2, split2);
+         assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 0) || 
ensureReducers(ID_1, 0) && ensureReducers(ID_2, 1);
+         assert ensureEmpty(ID_3);
+ 
+         plan(2, split1, split2);
+         assert ensureMappers(ID_1, split1);
+         assert ensureMappers(ID_2, split2);
+         assert ensureReducers(ID_1, 1);
+         assert ensureReducers(ID_2, 1);
+         assert ensureEmpty(ID_3);
+ 
+         plan(3, split1, split2);
+         assert ensureMappers(ID_1, split1);
+         assert ensureMappers(ID_2, split2);
+         assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 2) || 
ensureReducers(ID_1, 2) && ensureReducers(ID_2, 1);
+         assert ensureEmpty(ID_3);
+ 
+         plan(3, split1, split2, split3);
+         assert ensureMappers(ID_1, split1);
+         assert ensureMappers(ID_2, split2);
+         assert ensureMappers(ID_3, split3);
+         assert ensureReducers(ID_1, 1);
+         assert ensureReducers(ID_2, 1);
+         assert ensureReducers(ID_3, 1);
+ 
+         plan(5, split1, split2, split3);
+         assert ensureMappers(ID_1, split1);
+         assert ensureMappers(ID_2, split2);
+         assert ensureMappers(ID_3, split3);
+         assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 2) && 
ensureReducers(ID_3, 2) ||
+             ensureReducers(ID_1, 2) && ensureReducers(ID_2, 1) && 
ensureReducers(ID_3, 2) ||
+             ensureReducers(ID_1, 2) && ensureReducers(ID_2, 2) && 
ensureReducers(ID_3, 1);
+     }
+ 
+     /**
+      * @throws IgniteCheckedException If failed.
+      */
+     public void testNonIgfsOneBlockPerNode() throws IgniteCheckedException {
+         HadoopFileBlock split1 = split(false, "/file1", 0, 100, HOST_1);
+         HadoopFileBlock split2 = split(false, "/file2", 0, 100, HOST_2);
+         HadoopFileBlock split3 = split(false, "/file3", 0, 100, HOST_3);
+ 
+         plan(1, split1);
+         assert ensureMappers(ID_1, split1);
+         assert ensureReducers(ID_1, 1);
+         assert ensureEmpty(ID_2);
+         assert ensureEmpty(ID_3);
+ 
+         plan(2, split1);
+         assert ensureMappers(ID_1, split1);
+         assert ensureReducers(ID_1, 2);
+         assert ensureEmpty(ID_2);
+         assert ensureEmpty(ID_3);
+ 
+         plan(1, split1, split2);
+         assert ensureMappers(ID_1, split1);
+         assert ensureMappers(ID_2, split2);
+         assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 0) || 
ensureReducers(ID_1, 0) && ensureReducers(ID_2, 1);
+         assert ensureEmpty(ID_3);
+ 
+         plan(2, split1, split2);
+         assert ensureMappers(ID_1, split1);
+         assert ensureMappers(ID_2, split2);
+         assert ensureReducers(ID_1, 1);
+         assert ensureReducers(ID_2, 1);
+         assert ensureEmpty(ID_3);
+ 
+         plan(3, split1, split2);
+         assert ensureMappers(ID_1, split1);
+         assert ensureMappers(ID_2, split2);
+         assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 2) || 
ensureReducers(ID_1, 2) && ensureReducers(ID_2, 1);
+         assert ensureEmpty(ID_3);
+ 
+         plan(3, split1, split2, split3);
+         assert ensureMappers(ID_1, split1);
+         assert ensureMappers(ID_2, split2);
+         assert ensureMappers(ID_3, split3);
+         assert ensureReducers(ID_1, 1);
+         assert ensureReducers(ID_2, 1);
+         assert ensureReducers(ID_3, 1);
+ 
+         plan(5, split1, split2, split3);
+         assert ensureMappers(ID_1, split1);
+         assert ensureMappers(ID_2, split2);
+         assert ensureMappers(ID_3, split3);
+         assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 2) && 
ensureReducers(ID_3, 2) ||
+             ensureReducers(ID_1, 2) && ensureReducers(ID_2, 1) && 
ensureReducers(ID_3, 2) ||
+             ensureReducers(ID_1, 2) && ensureReducers(ID_2, 2) && 
ensureReducers(ID_3, 1);
+     }
+ 
+     /**
+      * @throws IgniteCheckedException If failed.
+      */
+     public void testIgfsSeveralBlocksPerNode() throws IgniteCheckedException {
+         HadoopFileBlock split1 = split(true, "/file1", 0, 100, HOST_1, 
HOST_2);
+         HadoopFileBlock split2 = split(true, "/file2", 0, 100, HOST_1, 
HOST_2);
+         HadoopFileBlock split3 = split(true, "/file3", 0, 100, HOST_1, 
HOST_3);
+ 
+         mapIgfsBlock(split1.file(), 0, 100, location(0, 100, ID_1, ID_2));
+         mapIgfsBlock(split2.file(), 0, 100, location(0, 100, ID_1, ID_2));
+         mapIgfsBlock(split3.file(), 0, 100, location(0, 100, ID_1, ID_3));
+ 
+         plan(1, split1);
+         assert ensureMappers(ID_1, split1) && ensureReducers(ID_1, 1) && 
ensureEmpty(ID_2) ||
+             ensureEmpty(ID_1) && ensureMappers(ID_2, split1) && 
ensureReducers(ID_2, 1);
+         assert ensureEmpty(ID_3);
+ 
+         plan(2, split1);
+         assert ensureMappers(ID_1, split1) && ensureReducers(ID_1, 2) && 
ensureEmpty(ID_2) ||
+             ensureEmpty(ID_1) && ensureMappers(ID_2, split1) && 
ensureReducers(ID_2, 2);
+         assert ensureEmpty(ID_3);
+ 
+         plan(1, split1, split2);
+         assert ensureMappers(ID_1, split1) && ensureMappers(ID_2, split2) ||
+             ensureMappers(ID_1, split2) && ensureMappers(ID_2, split1);
+         assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 0) || 
ensureReducers(ID_1, 0) && ensureReducers(ID_2, 1);
+         assert ensureEmpty(ID_3);
+ 
+         plan(2, split1, split2);
+         assert ensureMappers(ID_1, split1) && ensureMappers(ID_2, split2) ||
+             ensureMappers(ID_1, split2) && ensureMappers(ID_2, split1);
+         assert ensureReducers(ID_1, 1);
+         assert ensureReducers(ID_2, 1);
+         assert ensureEmpty(ID_3);
+ 
+         plan(3, split1, split2, split3);
+         assert ensureReducers(ID_1, 1);
+         assert ensureReducers(ID_2, 1);
+         assert ensureReducers(ID_3, 1);
+ 
+         plan(5, split1, split2, split3);
+         assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 2) && 
ensureReducers(ID_3, 2) ||
+             ensureReducers(ID_1, 2) && ensureReducers(ID_2, 1) && 
ensureReducers(ID_3, 2) ||
+             ensureReducers(ID_1, 2) && ensureReducers(ID_2, 2) && 
ensureReducers(ID_3, 1);
+     }
+ 
+     /**
+      * @throws IgniteCheckedException If failed.
+      */
+     public void testNonIgfsSeveralBlocksPerNode() throws 
IgniteCheckedException {
+         HadoopFileBlock split1 = split(false, "/file1", 0, 100, HOST_1, 
HOST_2);
+         HadoopFileBlock split2 = split(false, "/file2", 0, 100, HOST_1, 
HOST_2);
+         HadoopFileBlock split3 = split(false, "/file3", 0, 100, HOST_1, 
HOST_3);
+ 
+         plan(1, split1);
+         assert ensureMappers(ID_1, split1) && ensureReducers(ID_1, 1) && 
ensureEmpty(ID_2) ||
+             ensureEmpty(ID_1) && ensureMappers(ID_2, split1) && 
ensureReducers(ID_2, 1);
+         assert ensureEmpty(ID_3);
+ 
+         plan(2, split1);
+         assert ensureMappers(ID_1, split1) && ensureReducers(ID_1, 2) && 
ensureEmpty(ID_2) ||
+             ensureEmpty(ID_1) && ensureMappers(ID_2, split1) && 
ensureReducers(ID_2, 2);
+         assert ensureEmpty(ID_3);
+ 
+         plan(1, split1, split2);
+         assert ensureMappers(ID_1, split1) && ensureMappers(ID_2, split2) ||
+             ensureMappers(ID_1, split2) && ensureMappers(ID_2, split1);
+         assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 0) || 
ensureReducers(ID_1, 0) && ensureReducers(ID_2, 1);
+         assert ensureEmpty(ID_3);
+ 
+         plan(2, split1, split2);
+         assert ensureMappers(ID_1, split1) && ensureMappers(ID_2, split2) ||
+             ensureMappers(ID_1, split2) && ensureMappers(ID_2, split1);
+         assert ensureReducers(ID_1, 1);
+         assert ensureReducers(ID_2, 1);
+         assert ensureEmpty(ID_3);
+ 
+         plan(3, split1, split2, split3);
+         assert ensureReducers(ID_1, 1);
+         assert ensureReducers(ID_2, 1);
+         assert ensureReducers(ID_3, 1);
+ 
+         plan(5, split1, split2, split3);
+         assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 2) && 
ensureReducers(ID_3, 2) ||
+             ensureReducers(ID_1, 2) && ensureReducers(ID_2, 1) && 
ensureReducers(ID_3, 2) ||
+             ensureReducers(ID_1, 2) && ensureReducers(ID_2, 2) && 
ensureReducers(ID_3, 1);
+     }
+ 
+     /**
+      * @throws IgniteCheckedException If failed.
+      */
+     public void testIgfsSeveralComplexBlocksPerNode() throws 
IgniteCheckedException {
+         HadoopFileBlock split1 = split(true, "/file1", 0, 100, HOST_1, 
HOST_2, HOST_3);
+         HadoopFileBlock split2 = split(true, "/file2", 0, 100, HOST_1, 
HOST_2, HOST_3);
+ 
+         mapIgfsBlock(split1.file(), 0, 100, location(0, 50, ID_1, ID_2), 
location(51, 100, ID_1, ID_3));
+         mapIgfsBlock(split2.file(), 0, 100, location(0, 50, ID_1, ID_2), 
location(51, 100, ID_2, ID_3));
+ 
+         plan(1, split1);
+         assert ensureMappers(ID_1, split1);
+         assert ensureReducers(ID_1, 1);
+         assert ensureEmpty(ID_2);
+         assert ensureEmpty(ID_3);
+ 
+         plan(1, split2);
+         assert ensureMappers(ID_2, split2);
+         assert ensureReducers(ID_2, 1);
+         assert ensureEmpty(ID_1);
+         assert ensureEmpty(ID_3);
+ 
+         plan(1, split1, split2);
+         assert ensureMappers(ID_1, split1);
+         assert ensureMappers(ID_2, split2);
+         assert ensureReducers(ID_1, 0) && ensureReducers(ID_2, 1) || 
ensureReducers(ID_1, 1) && ensureReducers(ID_2, 0);
+         assert ensureEmpty(ID_3);
+ 
+         plan(2, split1, split2);
+         assert ensureMappers(ID_1, split1);
+         assert ensureMappers(ID_2, split2);
+         assert ensureReducers(ID_1, 1);
+         assert ensureReducers(ID_2, 1);
+         assert ensureEmpty(ID_3);
+     }
+ 
+     /**
+      * @throws IgniteCheckedException If failed.
+      */
+     public void testNonIgfsOrphans() throws IgniteCheckedException {
+         HadoopFileBlock split1 = split(false, "/file1", 0, 100, 
INVALID_HOST_1, INVALID_HOST_2);
+         HadoopFileBlock split2 = split(false, "/file2", 0, 100, 
INVALID_HOST_1, INVALID_HOST_3);
+         HadoopFileBlock split3 = split(false, "/file3", 0, 100, 
INVALID_HOST_2, INVALID_HOST_3);
+ 
+         plan(1, split1);
+         assert ensureMappers(ID_1, split1) && ensureReducers(ID_1, 1) && 
ensureEmpty(ID_2) && ensureEmpty(ID_3) ||
+             ensureEmpty(ID_1) && ensureMappers(ID_2, split1) && 
ensureReducers(ID_2, 1) && ensureEmpty(ID_3) ||
+             ensureEmpty(ID_1) && ensureEmpty(ID_2) && ensureMappers(ID_3, 
split1) && ensureReducers(ID_3, 1);
+ 
+         plan(2, split1);
+         assert ensureMappers(ID_1, split1) && ensureReducers(ID_1, 2) && 
ensureEmpty(ID_2) && ensureEmpty(ID_3) ||
+             ensureEmpty(ID_1) && ensureMappers(ID_2, split1) && 
ensureReducers(ID_2, 2) && ensureEmpty(ID_3) ||
+             ensureEmpty(ID_1) && ensureEmpty(ID_2) && ensureMappers(ID_3, 
split1) && ensureReducers(ID_3, 2);
+ 
+         plan(1, split1, split2, split3);
+         assert ensureMappers(ID_1, split1) && ensureMappers(ID_2, split2) && 
ensureMappers(ID_3, split3) ||
+             ensureMappers(ID_1, split1) && ensureMappers(ID_2, split3) && 
ensureMappers(ID_3, split2) ||
+             ensureMappers(ID_1, split2) && ensureMappers(ID_2, split1) && 
ensureMappers(ID_3, split3) ||
+             ensureMappers(ID_1, split2) && ensureMappers(ID_2, split3) && 
ensureMappers(ID_3, split1) ||
+             ensureMappers(ID_1, split3) && ensureMappers(ID_2, split1) && 
ensureMappers(ID_3, split2) ||
+             ensureMappers(ID_1, split3) && ensureMappers(ID_2, split2) && 
ensureMappers(ID_3, split1);
+         assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 0) && 
ensureReducers(ID_3, 0) ||
+             ensureReducers(ID_1, 0) && ensureReducers(ID_2, 1) && 
ensureReducers(ID_3, 0) ||
+             ensureReducers(ID_1, 0) && ensureReducers(ID_2, 0) && 
ensureReducers(ID_3, 1);
+ 
+         plan(3, split1, split2, split3);
+         assert ensureMappers(ID_1, split1) && ensureMappers(ID_2, split2) && 
ensureMappers(ID_3, split3) ||
+             ensureMappers(ID_1, split1) && ensureMappers(ID_2, split3) && 
ensureMappers(ID_3, split2) ||
+             ensureMappers(ID_1, split2) && ensureMappers(ID_2, split1) && 
ensureMappers(ID_3, split3) ||
+             ensureMappers(ID_1, split2) && ensureMappers(ID_2, split3) && 
ensureMappers(ID_3, split1) ||
+             ensureMappers(ID_1, split3) && ensureMappers(ID_2, split1) && 
ensureMappers(ID_3, split2) ||
+             ensureMappers(ID_1, split3) && ensureMappers(ID_2, split2) && 
ensureMappers(ID_3, split1);
+         assert ensureReducers(ID_1, 1);
+         assert ensureReducers(ID_2, 1);
+         assert ensureReducers(ID_3, 1);
+ 
+         plan(5, split1, split2, split3);
+         assert ensureMappers(ID_1, split1) && ensureMappers(ID_2, split2) && 
ensureMappers(ID_3, split3) ||
+             ensureMappers(ID_1, split1) && ensureMappers(ID_2, split3) && 
ensureMappers(ID_3, split2) ||
+             ensureMappers(ID_1, split2) && ensureMappers(ID_2, split1) && 
ensureMappers(ID_3, split3) ||
+             ensureMappers(ID_1, split2) && ensureMappers(ID_2, split3) && 
ensureMappers(ID_3, split1) ||
+             ensureMappers(ID_1, split3) && ensureMappers(ID_2, split1) && 
ensureMappers(ID_3, split2) ||
+             ensureMappers(ID_1, split3) && ensureMappers(ID_2, split2) && 
ensureMappers(ID_3, split1);
+         assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 2) && 
ensureReducers(ID_3, 2) ||
+             ensureReducers(ID_1, 2) && ensureReducers(ID_2, 1) && 
ensureReducers(ID_3, 2) ||
+             ensureReducers(ID_1, 2) && ensureReducers(ID_2, 2) && 
ensureReducers(ID_3, 1);
+     }
+ 
+     /**
+      * Create plan.
+      *
+      * @param reducers Reducers count.
+      * @param splits Splits.
+      * @return Plan.
+      * @throws IgniteCheckedException If failed.
+      */
+     private static HadoopMapReducePlan plan(int reducers, HadoopInputSplit... 
splits) throws IgniteCheckedException {
+         assert reducers > 0;
+         assert splits != null && splits.length > 0;
+ 
+         Collection<HadoopInputSplit> splitList = new 
ArrayList<>(splits.length);
+ 
+         Collections.addAll(splitList, splits);
+ 
+         Collection<ClusterNode> top = new ArrayList<>();
+ 
+         GridTestNode node1 = new GridTestNode(ID_1);
+         GridTestNode node2 = new GridTestNode(ID_2);
+         GridTestNode node3 = new GridTestNode(ID_3);
+ 
+         node1.setHostName(HOST_1);
+         node2.setHostName(HOST_2);
+         node3.setHostName(HOST_3);
+ 
+         top.add(node1);
+         top.add(node2);
+         top.add(node3);
+ 
+         HadoopMapReducePlan plan = PLANNER.preparePlan(new MockJob(reducers, 
splitList), top, null);
+ 
+         PLAN.set(plan);
+ 
+         return plan;
+     }
+ 
+     /**
+      * Ensure that node contains the given mappers.
+      *
+      * @param nodeId Node ID.
+      * @param expSplits Expected splits.
+      * @return {@code True} if this assumption is valid.
+      */
+     private static boolean ensureMappers(UUID nodeId, HadoopInputSplit... 
expSplits) {
+         Collection<HadoopInputSplit> expSplitsCol = new ArrayList<>();
+ 
+         Collections.addAll(expSplitsCol, expSplits);
+ 
+         Collection<HadoopInputSplit> splits = PLAN.get().mappers(nodeId);
+ 
+         return F.eq(expSplitsCol, splits);
+     }
+ 
+     /**
+      * Ensure that node contains the given amount of reducers.
+      *
+      * @param nodeId Node ID.
+      * @param reducers Reducers.
+      * @return {@code True} if this assumption is valid.
+      */
+     private static boolean ensureReducers(UUID nodeId, int reducers) {
+         int[] reducersArr = PLAN.get().reducers(nodeId);
+ 
+         return reducers == 0 ? F.isEmpty(reducersArr) : (reducersArr != null 
&& reducersArr.length == reducers);
+     }
+ 
+     /**
+      * Ensure that no mappers and reducers is located on this node.
+      *
+      * @param nodeId Node ID.
+      * @return {@code True} if this assumption is valid.
+      */
+     private static boolean ensureEmpty(UUID nodeId) {
+         return F.isEmpty(PLAN.get().mappers(nodeId)) && 
F.isEmpty(PLAN.get().reducers(nodeId));
+     }
+ 
+     /**
+      * Create split.
+      *
+      * @param igfs IGFS flag.
+      * @param file File.
+      * @param start Start.
+      * @param len Length.
+      * @param hosts Hosts.
+      * @return Split.
+      */
+     private static HadoopFileBlock split(boolean igfs, String file, long 
start, long len, String... hosts) {
+         URI uri = URI.create((igfs ? "igfs://igfs@" : "hdfs://") + file);
+ 
+         return new HadoopFileBlock(hosts, uri, start, len);
+     }
+ 
+     /**
+      * Create block location.
+      *
+      * @param start Start.
+      * @param len Length.
+      * @param nodeIds Node IDs.
+      * @return Block location.
+      */
+     private static IgfsBlockLocation location(long start, long len, UUID... 
nodeIds) {
+         assert nodeIds != null && nodeIds.length > 0;
+ 
+         Collection<ClusterNode> nodes = new ArrayList<>(nodeIds.length);
+ 
+         for (UUID id : nodeIds)
+             nodes.add(new GridTestNode(id));
+ 
+         return new IgfsBlockLocationImpl(start, len, nodes);
+     }
+ 
+     /**
+      * Map IGFS block to nodes.
+      *
+      * @param file File.
+      * @param start Start.
+      * @param len Length.
+      * @param locations Locations.
+      */
+     private static void mapIgfsBlock(URI file, long start, long len, 
IgfsBlockLocation... locations) {
+         assert locations != null && locations.length > 0;
+ 
+         IgfsPath path = new IgfsPath(file);
+ 
+         Block block = new Block(path, start, len);
+ 
+         Collection<IgfsBlockLocation> locationsList = new ArrayList<>();
+ 
+         Collections.addAll(locationsList, locations);
+ 
+         BLOCK_MAP.put(block, locationsList);
+     }
+ 
+     /**
+      * Block.
+      */
+     private static class Block {
+         /** */
+         private final IgfsPath path;
+ 
+         /** */
+         private final long start;
+ 
+         /** */
+         private final long len;
+ 
+         /**
+          * Constructor.
+          *
+          * @param path Path.
+          * @param start Start.
+          * @param len Length.
+          */
+         private Block(IgfsPath path, long start, long len) {
+             this.path = path;
+             this.start = start;
+             this.len = len;
+         }
+ 
+         /** {@inheritDoc} */
+         @SuppressWarnings("RedundantIfStatement")
+         @Override public boolean equals(Object o) {
+             if (this == o) return true;
+             if (!(o instanceof Block)) return false;
+ 
+             Block block = (Block) o;
+ 
+             if (len != block.len)
+                 return false;
+ 
+             if (start != block.start)
+                 return false;
+ 
+             if (!path.equals(block.path))
+                 return false;
+ 
+             return true;
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public int hashCode() {
+             int res = path.hashCode();
+ 
+             res = 31 * res + (int) (start ^ (start >>> 32));
+             res = 31 * res + (int) (len ^ (len >>> 32));
+ 
+             return res;
+         }
+     }
+ 
+     /**
+      * Mocked job.
+      */
+     private static class MockJob implements HadoopJob {
+         /** Reducers count. */
+         private final int reducers;
+ 
+         /** */
+         private Collection<HadoopInputSplit> splitList;
+ 
+         /**
+          * Constructor.
+          *
+          * @param reducers Reducers count.
+          * @param splitList Splits.
+          */
+         private MockJob(int reducers, Collection<HadoopInputSplit> splitList) 
{
+             this.reducers = reducers;
+             this.splitList = splitList;
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public HadoopJobId id() {
+             return null;
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public HadoopJobInfo info() {
+             return new HadoopDefaultJobInfo() {
+                 @Override public int reducers() {
+                     return reducers;
+                 }
+             };
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public Collection<HadoopInputSplit> input() throws 
IgniteCheckedException {
+             return splitList;
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public HadoopTaskContext getTaskContext(HadoopTaskInfo 
info) throws IgniteCheckedException {
+             return null;
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public void initialize(boolean external, UUID nodeId) 
throws IgniteCheckedException {
+             // No-op.
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public void dispose(boolean external) throws 
IgniteCheckedException {
+             // No-op.
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public void prepareTaskEnvironment(HadoopTaskInfo info) 
throws IgniteCheckedException {
+             // No-op.
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public void cleanupTaskEnvironment(HadoopTaskInfo info) 
throws IgniteCheckedException {
+             // No-op.
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public void cleanupStagingDirectory() {
+             // No-op.
+         }
+     }
+ 
+     /**
+      * Mocked IGFS.
+      */
+     private static class MockIgfs implements IgfsEx {
+         /** {@inheritDoc} */
+         @Override public boolean isProxy(URI path) {
+             return PROXY_MAP.containsKey(path) && PROXY_MAP.get(path);
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public Collection<IgfsBlockLocation> affinity(IgfsPath 
path, long start, long len) {
+             return BLOCK_MAP.get(new Block(path, start, len));
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public Collection<IgfsBlockLocation> affinity(IgfsPath 
path, long start, long len,
+             long maxLen) {
+             return null;
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public void stop() {
+             // No-op.
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public IgfsContext context() {
+             return null;
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public IgfsPaths proxyPaths() {
+             return null;
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public IgfsInputStreamAdapter open(IgfsPath path, int 
bufSize, int seqReadsBeforePrefetch) {
+             return null;
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public IgfsInputStreamAdapter open(IgfsPath path) {
+             return null;
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public IgfsInputStreamAdapter open(IgfsPath path, int 
bufSize) {
+             return null;
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public IgfsStatus globalSpace() throws 
IgniteCheckedException {
+             return null;
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public void globalSampling(@Nullable Boolean val) throws 
IgniteCheckedException {
+             // No-op.
+         }
+ 
+         /** {@inheritDoc} */
+         @Nullable @Override public Boolean globalSampling() {
+             return null;
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public IgfsLocalMetrics localMetrics() {
+             return null;
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public long groupBlockSize() {
+             return 0;
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public IgniteInternalFuture<?> awaitDeletesAsync() throws 
IgniteCheckedException {
+             return null;
+         }
+ 
+         /** {@inheritDoc} */
+         @Nullable @Override public String clientLogDirectory() {
+             return null;
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public void clientLogDirectory(String logDir) {
+             // No-op.
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public boolean evictExclude(IgfsPath path, boolean primary) 
{
+             return false;
+         }
+ 
+         /** {@inheritDoc} */
+         @Nullable @Override public String name() {
+             return null;
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public FileSystemConfiguration configuration() {
+             return null;
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public boolean exists(IgfsPath path) {
+             return false;
+         }
+ 
+         /** {@inheritDoc} */
+         @Nullable @Override public IgfsFile info(IgfsPath path) {
+             return null;
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public IgfsPathSummary summary(IgfsPath path) {
+             return null;
+         }
+ 
+         /** {@inheritDoc} */
+         @Nullable @Override public IgfsFile update(IgfsPath path, Map<String, 
String> props) {
+             return null;
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public void rename(IgfsPath src, IgfsPath dest) {
+             // No-op.
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public boolean delete(IgfsPath path, boolean recursive) {
+             return false;
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public void mkdirs(IgfsPath path) {
+             // No-op.
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public void mkdirs(IgfsPath path, @Nullable Map<String, 
String> props) {
+             // No-op.
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public Collection<IgfsPath> listPaths(IgfsPath path) {
+             return null;
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public Collection<IgfsFile> listFiles(IgfsPath path) {
+             return null;
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public long usedSpaceSize() {
+             return 0;
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public IgfsOutputStream create(IgfsPath path, boolean 
overwrite) {
+             return null;
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public IgfsOutputStream create(IgfsPath path, int bufSize, 
boolean overwrite, int replication,
+             long blockSize, @Nullable Map<String, String> props) {
+             return null;
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public IgfsOutputStream create(IgfsPath path, int bufSize, 
boolean overwrite,
+             @Nullable IgniteUuid affKey, int replication, long blockSize, 
@Nullable Map<String, String> props) {
+             return null;
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public IgfsOutputStream append(IgfsPath path, boolean 
create) {
+             return null;
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public IgfsOutputStream append(IgfsPath path, int bufSize, 
boolean create,
+             @Nullable Map<String, String> props) {
+             return null;
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public void setTimes(IgfsPath path, long accessTime, long 
modificationTime) {
+             // No-op.
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public IgfsMetrics metrics() {
+             return null;
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public void resetMetrics() {
+             // No-op.
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public long size(IgfsPath path) {
+             return 0;
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public void format() {
+             // No-op.
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public <T, R> R execute(IgfsTask<T, R> task, @Nullable 
IgfsRecordResolver rslvr,
+             Collection<IgfsPath> paths, @Nullable T arg) {
+             return null;
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public <T, R> R execute(IgfsTask<T, R> task, @Nullable 
IgfsRecordResolver rslvr,
+             Collection<IgfsPath> paths, boolean skipNonExistentFiles, long 
maxRangeLen, @Nullable T arg) {
+             return null;
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public <T, R> R execute(Class<? extends IgfsTask<T, R>> 
taskCls,
+             @Nullable IgfsRecordResolver rslvr, Collection<IgfsPath> paths, 
@Nullable T arg) {
+             return null;
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public <T, R> R execute(Class<? extends IgfsTask<T, R>> 
taskCls,
+             @Nullable IgfsRecordResolver rslvr, Collection<IgfsPath> paths, 
boolean skipNonExistentFiles,
+             long maxRangeLen, @Nullable T arg) {
+             return null;
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public IgniteUuid nextAffinityKey() {
+             return null;
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public IgniteFileSystem withAsync() {
+             return null;
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public boolean isAsync() {
+             return false;
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public <R> IgniteFuture<R> future() {
+             return null;
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public IgfsSecondaryFileSystem asSecondary() {
+             return null;
+         }
+     }
+ 
+     /**
+      * Mocked Grid.
+      */
+     @SuppressWarnings("ExternalizableWithoutPublicNoArgConstructor")
+     private static class MockIgnite extends IgniteSpringBean implements 
IgniteEx {
+         /** {@inheritDoc} */
+         @Override public IgniteClusterEx cluster() {
+             return (IgniteClusterEx)super.cluster();
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public IgniteFileSystem igfsx(String name) {
+             assert F.eq("igfs", name);
+ 
+             return IGFS;
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public Hadoop hadoop() {
+             return null;
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public String name() {
+             return null;
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public <K extends GridCacheUtilityKey, V> 
GridCacheProjectionEx<K, V> utilityCache(Class<K> keyCls,
+             Class<V> valCls) {
+             return null;
+         }
+ 
+         /** {@inheritDoc} */
+         @Nullable @Override public <K, V> GridCache<K, V> cachex(@Nullable 
String name) {
+             return null;
+         }
+ 
+         /** {@inheritDoc} */
+         @Nullable @Override public <K, V> GridCache<K, V> cachex() {
+             return null;
+         }
+ 
+         /** {@inheritDoc} */
+         @SuppressWarnings("unchecked")
+         @Override public Collection<GridCache<?, ?>> cachesx(@Nullable 
IgnitePredicate<? super GridCache<?, ?>>... p) {
+             return null;
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public boolean eventUserRecordable(int type) {
+             return false;
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public boolean allEventsUserRecordable(int[] types) {
+             return false;
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public Collection<String> compatibleVersions() {
+             return null;
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public boolean isJmxRemoteEnabled() {
+             return false;
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public boolean isRestartEnabled() {
+             return false;
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public ClusterNode localNode() {
+             return null;
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public String latestVersion() {
+             return null;
+         }
++
++        /** {@inheritDoc} */
++        @Override public GridKernalContext context() {
++            return null;
++        }
+     }
+ }

Reply via email to