http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTasksAllVersionsTest.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTasksAllVersionsTest.java
 
b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTasksAllVersionsTest.java
index af3f872..a959472 100644
--- 
a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTasksAllVersionsTest.java
+++ 
b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTasksAllVersionsTest.java
@@ -43,7 +43,7 @@ abstract class GridHadoopTasksAllVersionsTest extends 
GridHadoopAbstractWordCoun
      * @return Hadoop job.
      * @throws IOException If fails.
      */
-    public abstract GridHadoopV2Job getHadoopJob(String inFile, String 
outFile) throws Exception;
+    public abstract HadoopV2Job getHadoopJob(String inFile, String outFile) 
throws Exception;
 
     /**
      * @return prefix of reducer output file name. It's "part-" for v1 and 
"part-r-" for v2 API
@@ -79,11 +79,11 @@ abstract class GridHadoopTasksAllVersionsTest extends 
GridHadoopAbstractWordCoun
         GridHadoopFileBlock fileBlock2 = new GridHadoopFileBlock(HOSTS, 
inFileUri, fileBlock1.length(),
                 igfs.info(inFile).length() - fileBlock1.length());
 
-        GridHadoopV2Job gridJob = getHadoopJob(igfsScheme() + 
inFile.toString(), igfsScheme() + PATH_OUTPUT);
+        HadoopV2Job gridJob = getHadoopJob(igfsScheme() + inFile.toString(), 
igfsScheme() + PATH_OUTPUT);
 
         GridHadoopTaskInfo taskInfo = new 
GridHadoopTaskInfo(GridHadoopTaskType.MAP, gridJob.id(), 0, 0, fileBlock1);
 
-        GridHadoopTestTaskContext ctx = new 
GridHadoopTestTaskContext(taskInfo, gridJob);
+        HadoopTestTaskContext ctx = new HadoopTestTaskContext(taskInfo, 
gridJob);
 
         ctx.mockOutput().clear();
 
@@ -110,11 +110,11 @@ abstract class GridHadoopTasksAllVersionsTest extends 
GridHadoopAbstractWordCoun
      * @return Context with mock output.
      * @throws IgniteCheckedException If fails.
      */
-    private GridHadoopTestTaskContext runTaskWithInput(GridHadoopV2Job 
gridJob, GridHadoopTaskType taskType,
+    private HadoopTestTaskContext runTaskWithInput(HadoopV2Job gridJob, 
GridHadoopTaskType taskType,
         int taskNum, String... words) throws IgniteCheckedException {
         GridHadoopTaskInfo taskInfo = new GridHadoopTaskInfo(taskType, 
gridJob.id(), taskNum, 0, null);
 
-        GridHadoopTestTaskContext ctx = new 
GridHadoopTestTaskContext(taskInfo, gridJob);
+        HadoopTestTaskContext ctx = new HadoopTestTaskContext(taskInfo, 
gridJob);
 
         for (int i = 0; i < words.length; i+=2) {
             List<IntWritable> valList = new ArrayList<>();
@@ -136,7 +136,7 @@ abstract class GridHadoopTasksAllVersionsTest extends 
GridHadoopAbstractWordCoun
      * @throws Exception If fails.
      */
     public void testReduceTask() throws Exception {
-        GridHadoopV2Job gridJob = getHadoopJob(igfsScheme() + PATH_INPUT, 
igfsScheme() + PATH_OUTPUT);
+        HadoopV2Job gridJob = getHadoopJob(igfsScheme() + PATH_INPUT, 
igfsScheme() + PATH_OUTPUT);
 
         runTaskWithInput(gridJob, GridHadoopTaskType.REDUCE, 0, "word1", "5", 
"word2", "10");
         runTaskWithInput(gridJob, GridHadoopTaskType.REDUCE, 1, "word3", "7", 
"word4", "15");
@@ -162,9 +162,9 @@ abstract class GridHadoopTasksAllVersionsTest extends 
GridHadoopAbstractWordCoun
      * @throws Exception If fails.
      */
     public void testCombinerTask() throws Exception {
-        GridHadoopV2Job gridJob = getHadoopJob("/", "/");
+        HadoopV2Job gridJob = getHadoopJob("/", "/");
 
-        GridHadoopTestTaskContext ctx =
+        HadoopTestTaskContext ctx =
             runTaskWithInput(gridJob, GridHadoopTaskType.COMBINE, 0, "word1", 
"5", "word2", "10");
 
         assertEquals("word1,5; word2,10", Joiner.on("; 
").join(ctx.mockOutput()));
@@ -182,18 +182,18 @@ abstract class GridHadoopTasksAllVersionsTest extends 
GridHadoopAbstractWordCoun
      * @return Context of combine task with mock output.
      * @throws IgniteCheckedException If fails.
      */
-    private GridHadoopTestTaskContext runMapCombineTask(GridHadoopFileBlock 
fileBlock, GridHadoopV2Job gridJob)
+    private HadoopTestTaskContext runMapCombineTask(GridHadoopFileBlock 
fileBlock, HadoopV2Job gridJob)
         throws IgniteCheckedException {
         GridHadoopTaskInfo taskInfo = new 
GridHadoopTaskInfo(GridHadoopTaskType.MAP, gridJob.id(), 0, 0, fileBlock);
 
-        GridHadoopTestTaskContext mapCtx = new 
GridHadoopTestTaskContext(taskInfo, gridJob);
+        HadoopTestTaskContext mapCtx = new HadoopTestTaskContext(taskInfo, 
gridJob);
 
         mapCtx.run();
 
         //Prepare input for combine
         taskInfo = new GridHadoopTaskInfo(GridHadoopTaskType.COMBINE, 
gridJob.id(), 0, 0, null);
 
-        GridHadoopTestTaskContext combineCtx = new 
GridHadoopTestTaskContext(taskInfo, gridJob);
+        HadoopTestTaskContext combineCtx = new HadoopTestTaskContext(taskInfo, 
gridJob);
 
         combineCtx.makeTreeOfWritables(mapCtx.mockOutput());
 
@@ -228,16 +228,16 @@ abstract class GridHadoopTasksAllVersionsTest extends 
GridHadoopAbstractWordCoun
         GridHadoopFileBlock fileBlock1 = new GridHadoopFileBlock(HOSTS, 
inFileUri, 0, l);
         GridHadoopFileBlock fileBlock2 = new GridHadoopFileBlock(HOSTS, 
inFileUri, l, fileLen - l);
 
-        GridHadoopV2Job gridJob = getHadoopJob(inFileUri.toString(), 
igfsScheme() + PATH_OUTPUT);
+        HadoopV2Job gridJob = getHadoopJob(inFileUri.toString(), igfsScheme() 
+ PATH_OUTPUT);
 
-        GridHadoopTestTaskContext combine1Ctx = runMapCombineTask(fileBlock1, 
gridJob);
+        HadoopTestTaskContext combine1Ctx = runMapCombineTask(fileBlock1, 
gridJob);
 
-        GridHadoopTestTaskContext combine2Ctx = runMapCombineTask(fileBlock2, 
gridJob);
+        HadoopTestTaskContext combine2Ctx = runMapCombineTask(fileBlock2, 
gridJob);
 
         //Prepare input for combine
         GridHadoopTaskInfo taskInfo = new 
GridHadoopTaskInfo(GridHadoopTaskType.REDUCE, gridJob.id(), 0, 0, null);
 
-        GridHadoopTestTaskContext reduceCtx = new 
GridHadoopTestTaskContext(taskInfo, gridJob);
+        HadoopTestTaskContext reduceCtx = new HadoopTestTaskContext(taskInfo, 
gridJob);
 
         reduceCtx.makeTreeOfWritables(combine1Ctx.mockOutput());
         reduceCtx.makeTreeOfWritables(combine2Ctx.mockOutput());

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTasksV1Test.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTasksV1Test.java
 
b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTasksV1Test.java
index c6b10bd..679be71 100644
--- 
a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTasksV1Test.java
+++ 
b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTasksV1Test.java
@@ -38,7 +38,7 @@ public class GridHadoopTasksV1Test extends 
GridHadoopTasksAllVersionsTest {
      * @return Hadoop job.
      * @throws IOException If fails.
      */
-    @Override public GridHadoopV2Job getHadoopJob(String inFile, String 
outFile) throws Exception {
+    @Override public HadoopV2Job getHadoopJob(String inFile, String outFile) 
throws Exception {
         JobConf jobConf = GridHadoopWordCount1.getJob(inFile, outFile);
 
         setupFileSystems(jobConf);
@@ -47,7 +47,7 @@ public class GridHadoopTasksV1Test extends 
GridHadoopTasksAllVersionsTest {
 
         GridHadoopJobId jobId = new GridHadoopJobId(new UUID(0, 0), 0);
 
-        return new GridHadoopV2Job(jobId, jobInfo, log);
+        return new HadoopV2Job(jobId, jobInfo, log);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTasksV2Test.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTasksV2Test.java
 
b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTasksV2Test.java
index 13dd688..4d20b9c 100644
--- 
a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTasksV2Test.java
+++ 
b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTasksV2Test.java
@@ -42,7 +42,7 @@ public class GridHadoopTasksV2Test extends 
GridHadoopTasksAllVersionsTest {
      * @return Hadoop job.
      * @throws Exception if fails.
      */
-    @Override public GridHadoopV2Job getHadoopJob(String inFile, String 
outFile) throws Exception {
+    @Override public HadoopV2Job getHadoopJob(String inFile, String outFile) 
throws Exception {
         Job job = Job.getInstance();
 
         job.setOutputKeyClass(Text.class);
@@ -65,7 +65,7 @@ public class GridHadoopTasksV2Test extends 
GridHadoopTasksAllVersionsTest {
 
         GridHadoopJobId jobId = new GridHadoopJobId(new UUID(0, 0), 0);
 
-        return new GridHadoopV2Job(jobId, jobInfo, log);
+        return new HadoopV2Job(jobId, jobInfo, log);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTestTaskContext.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTestTaskContext.java
 
b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTestTaskContext.java
deleted file mode 100644
index 80b00a6..0000000
--- 
a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTestTaskContext.java
+++ /dev/null
@@ -1,219 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop;
-
-import org.apache.hadoop.io.*;
-import org.apache.hadoop.mapred.*;
-import org.apache.ignite.*;
-import org.apache.ignite.internal.processors.hadoop.v2.*;
-
-import java.io.*;
-import java.util.*;
-
-/**
- * Context for test purpose.
- */
-class GridHadoopTestTaskContext extends GridHadoopV2TaskContext {
-    /**
-     * Simple key-vale pair.
-     * @param <K> Key class.
-     * @param <V> Value class.
-     */
-    public static class Pair<K,V> {
-        /** Key */
-        private K key;
-
-        /** Value */
-        private V val;
-
-        /**
-         * @param key key.
-         * @param val value.
-         */
-        Pair(K key, V val) {
-            this.key = key;
-            this.val = val;
-        }
-
-        /**
-         * Getter of key.
-         * @return key.
-         */
-        K key() {
-            return key;
-        }
-
-        /**
-         * Getter of value.
-         * @return value.
-         */
-        V value() {
-            return val;
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return key + "," + val;
-        }
-    }
-
-    /** Mock output container- result data of task execution if it is not 
overridden. */
-    private List<Pair<String, Integer>> mockOutput = new ArrayList<>();
-
-    /** Mock input container- input data if it is not overridden. */
-    private Map<Object,List> mockInput = new TreeMap<>();
-
-    /** Context output implementation to write data into mockOutput. */
-    private GridHadoopTaskOutput output = new GridHadoopTaskOutput() {
-        /** {@inheritDoc} */
-        @Override public void write(Object key, Object val) {
-            //Check of casting and extract/copy values
-            String strKey = new String(((Text)key).getBytes());
-            int intVal = ((IntWritable)val).get();
-
-            mockOutput().add(new Pair<>(strKey, intVal));
-        }
-
-        /** {@inheritDoc} */
-        @Override public void close() {
-            throw new UnsupportedOperationException();
-        }
-    };
-
-    /** Context input implementation to read data from mockInput. */
-    private GridHadoopTaskInput input = new GridHadoopTaskInput() {
-        /** Iterator of keys and associated lists of values. */
-        Iterator<Map.Entry<Object, List>> iter;
-
-        /** Current key and associated value list. */
-        Map.Entry<Object, List> currEntry;
-
-        /** {@inheritDoc} */
-        @Override public boolean next() {
-            if (iter == null)
-                iter = mockInput().entrySet().iterator();
-
-            if (iter.hasNext())
-                currEntry = iter.next();
-            else
-                currEntry = null;
-
-            return currEntry != null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public Object key() {
-            return currEntry.getKey();
-        }
-
-        /** {@inheritDoc} */
-        @Override public Iterator<?> values() {
-            return currEntry.getValue().iterator() ;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void close() {
-            throw new UnsupportedOperationException();
-        }
-    };
-
-    /**
-     * Getter of mock output container - result of task if it is not 
overridden.
-     *
-     * @return mock output.
-     */
-    public List<Pair<String, Integer>> mockOutput() {
-        return mockOutput;
-    }
-
-    /**
-     * Getter of mock input container- input data if it is not overridden.
-     *
-     * @return mock output.
-     */
-    public Map<Object, List> mockInput() {
-        return mockInput;
-    }
-
-    /**
-     * Generate one-key-multiple-values tree from array of key-value pairs, 
and wrap its into Writable objects.
-     * The result is placed into mock input.
-     *
-     * @param flatData list of key-value pair.
-     */
-    public void makeTreeOfWritables(Iterable<Pair<String, Integer>> flatData) {
-        Text key = new Text();
-
-        for (GridHadoopTestTaskContext.Pair<String, Integer> pair : flatData) {
-            key.set(pair.key);
-            ArrayList<IntWritable> valList;
-
-            if (!mockInput.containsKey(key)) {
-                valList = new ArrayList<>();
-                mockInput.put(key, valList);
-                key = new Text();
-            }
-            else
-                valList = (ArrayList<IntWritable>) mockInput.get(key);
-            valList.add(new IntWritable(pair.value()));
-        }
-    }
-
-    /**
-     * @param taskInfo Task info.
-     * @param gridJob Grid Hadoop job.
-     */
-    public GridHadoopTestTaskContext(GridHadoopTaskInfo taskInfo, 
GridHadoopJob gridJob) throws IgniteCheckedException {
-        super(taskInfo, gridJob, gridJob.id(), null, 
jobConfDataInput(gridJob));
-    }
-
-    /**
-     * Creates DataInput to read JobConf.
-     *
-     * @param job Job.
-     * @return DataInput with JobConf.
-     * @throws IgniteCheckedException If failed.
-     */
-    private static DataInput jobConfDataInput(GridHadoopJob job) throws 
IgniteCheckedException {
-        JobConf jobConf = new JobConf();
-
-        for (Map.Entry<String, String> e : 
((HadoopDefaultJobInfo)job.info()).properties().entrySet())
-            jobConf.set(e.getKey(), e.getValue());
-
-        ByteArrayOutputStream buf = new ByteArrayOutputStream();
-
-        try {
-            jobConf.write(new DataOutputStream(buf));
-        }
-        catch (IOException e) {
-            throw new IgniteCheckedException(e);
-        }
-
-        return new DataInputStream(new 
ByteArrayInputStream(buf.toByteArray()));
-    }
-
-    /** {@inheritDoc} */
-    @Override public GridHadoopTaskOutput output() {
-        return output;
-    }
-
-    /** {@inheritDoc} */
-    @Override public GridHadoopTaskInput input() {
-        return input;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopV2JobSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopV2JobSelfTest.java
 
b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopV2JobSelfTest.java
deleted file mode 100644
index c7a456b..0000000
--- 
a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopV2JobSelfTest.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop;
-
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.io.*;
-import org.apache.hadoop.io.serializer.*;
-import org.apache.hadoop.mapred.*;
-import org.apache.ignite.*;
-import org.apache.ignite.internal.processors.hadoop.v2.*;
-
-import java.io.*;
-import java.util.*;
-
-import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*;
-
-/**
- * Self test of {@link GridHadoopV2Job}.
- */
-public class GridHadoopV2JobSelfTest extends GridHadoopAbstractSelfTest {
-    /** */
-    private static final String TEST_SERIALIZED_VALUE = "Test serialized 
value";
-
-    /**
-     * Custom serialization class that accepts {@link Writable}.
-     */
-    private static class CustomSerialization extends WritableSerialization {
-        /** {@inheritDoc} */
-        @Override public Deserializer<Writable> 
getDeserializer(Class<Writable> c) {
-            return new Deserializer<Writable>() {
-                @Override public void open(InputStream in) { }
-
-                @Override public Writable deserialize(Writable writable) {
-                    return new Text(TEST_SERIALIZED_VALUE);
-                }
-
-                @Override public void close() { }
-            };
-        }
-    }
-
-    /**
-     * Tests that {@link GridHadoopJob} provides wrapped serializer if it's 
set in configuration.
-     *
-     * @throws IgniteCheckedException If fails.
-     */
-    public void testCustomSerializationApplying() throws 
IgniteCheckedException {
-        JobConf cfg = new JobConf();
-
-        cfg.setMapOutputKeyClass(IntWritable.class);
-        cfg.setMapOutputValueClass(Text.class);
-        cfg.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, 
CustomSerialization.class.getName());
-
-        GridHadoopJob job = new GridHadoopV2Job(new 
GridHadoopJobId(UUID.randomUUID(), 1), createJobInfo(cfg), log);
-
-        GridHadoopTaskContext taskCtx = job.getTaskContext(new 
GridHadoopTaskInfo(GridHadoopTaskType.MAP, null, 0, 0,
-            null));
-
-        GridHadoopSerialization ser = taskCtx.keySerialization();
-
-        assertEquals(GridHadoopSerializationWrapper.class.getName(), 
ser.getClass().getName());
-
-        DataInput in = new DataInputStream(new ByteArrayInputStream(new 
byte[0]));
-
-        assertEquals(TEST_SERIALIZED_VALUE, ser.read(in, null).toString());
-
-        ser = taskCtx.valueSerialization();
-
-        assertEquals(GridHadoopSerializationWrapper.class.getName(), 
ser.getClass().getName());
-
-        assertEquals(TEST_SERIALIZED_VALUE, ser.read(in, null).toString());
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSerializationWrapperSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSerializationWrapperSelfTest.java
 
b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSerializationWrapperSelfTest.java
new file mode 100644
index 0000000..116248f
--- /dev/null
+++ 
b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSerializationWrapperSelfTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.io.serializer.*;
+import org.apache.ignite.internal.processors.hadoop.v2.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * Test of wrapper of the native serialization.
+ */
+public class HadoopSerializationWrapperSelfTest extends GridCommonAbstractTest 
{
+    /**
+     * Tests read/write of IntWritable via native WritableSerialization.
+     * @throws Exception If fails.
+     */
+    public void testIntWritableSerialization() throws Exception {
+        GridHadoopSerialization ser = new HadoopSerializationWrapper(new 
WritableSerialization(), IntWritable.class);
+
+        ByteArrayOutputStream buf = new ByteArrayOutputStream();
+
+        DataOutput out = new DataOutputStream(buf);
+
+        ser.write(out, new IntWritable(3));
+        ser.write(out, new IntWritable(-5));
+
+        assertEquals("[0, 0, 0, 3, -1, -1, -1, -5]", 
Arrays.toString(buf.toByteArray()));
+
+        DataInput in = new DataInputStream(new 
ByteArrayInputStream(buf.toByteArray()));
+
+        assertEquals(3, ((IntWritable)ser.read(in, null)).get());
+        assertEquals(-5, ((IntWritable)ser.read(in, null)).get());
+    }
+
+    /**
+     * Tests read/write of Integer via native JavaleSerialization.
+     * @throws Exception If fails.
+     */
+    public void testIntJavaSerialization() throws Exception {
+        GridHadoopSerialization ser = new HadoopSerializationWrapper(new 
JavaSerialization(), Integer.class);
+
+        ByteArrayOutputStream buf = new ByteArrayOutputStream();
+
+        DataOutput out = new DataOutputStream(buf);
+
+        ser.write(out, 3);
+        ser.write(out, -5);
+        ser.close();
+
+        DataInput in = new DataInputStream(new 
ByteArrayInputStream(buf.toByteArray()));
+
+        assertEquals(3, ((Integer)ser.read(in, null)).intValue());
+        assertEquals(-5, ((Integer)ser.read(in, null)).intValue());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSplitWrapperSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSplitWrapperSelfTest.java
 
b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSplitWrapperSelfTest.java
new file mode 100644
index 0000000..040730b
--- /dev/null
+++ 
b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSplitWrapperSelfTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.mapreduce.lib.input.*;
+import org.apache.ignite.internal.processors.hadoop.v2.*;
+import org.apache.ignite.testframework.*;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.*;
+
+/**
+ * Self test of {@link 
org.apache.ignite.internal.processors.hadoop.v2.HadoopSplitWrapper}.
+ */
+public class HadoopSplitWrapperSelfTest extends GridHadoopAbstractSelfTest {
+    /**
+     * Tests serialization of wrapper and the wrapped native split.
+     * @throws Exception If fails.
+     */
+    public void testSerialization() throws Exception {
+        FileSplit nativeSplit = new FileSplit(new Path("/path/to/file"), 100, 
500, new String[]{"host1", "host2"});
+
+        assertEquals("/path/to/file:100+500", nativeSplit.toString());
+
+        HadoopSplitWrapper split = HadoopUtils.wrapSplit(10, nativeSplit, 
nativeSplit.getLocations());
+
+        assertEquals("[host1, host2]", Arrays.toString(split.hosts()));
+
+        ByteArrayOutputStream buf = new ByteArrayOutputStream();
+
+        ObjectOutput out = new ObjectOutputStream(buf);
+
+        out.writeObject(split);
+
+        ObjectInput in = new ObjectInputStream(new 
ByteArrayInputStream(buf.toByteArray()));
+
+        final HadoopSplitWrapper res = (HadoopSplitWrapper)in.readObject();
+
+        assertEquals("/path/to/file:100+500", 
HadoopUtils.unwrapSplit(res).toString());
+
+        GridTestUtils.assertThrows(log, new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                res.hosts();
+
+                return null;
+            }
+        }, AssertionError.class, null);
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTestTaskContext.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTestTaskContext.java
 
b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTestTaskContext.java
new file mode 100644
index 0000000..9b56300
--- /dev/null
+++ 
b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTestTaskContext.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.mapred.*;
+import org.apache.ignite.*;
+import org.apache.ignite.internal.processors.hadoop.v2.*;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * Context for test purpose.
+ */
+class HadoopTestTaskContext extends HadoopV2TaskContext {
+    /**
+     * Simple key-vale pair.
+     * @param <K> Key class.
+     * @param <V> Value class.
+     */
+    public static class Pair<K,V> {
+        /** Key */
+        private K key;
+
+        /** Value */
+        private V val;
+
+        /**
+         * @param key key.
+         * @param val value.
+         */
+        Pair(K key, V val) {
+            this.key = key;
+            this.val = val;
+        }
+
+        /**
+         * Getter of key.
+         * @return key.
+         */
+        K key() {
+            return key;
+        }
+
+        /**
+         * Getter of value.
+         * @return value.
+         */
+        V value() {
+            return val;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return key + "," + val;
+        }
+    }
+
+    /** Mock output container- result data of task execution if it is not 
overridden. */
+    private List<Pair<String, Integer>> mockOutput = new ArrayList<>();
+
+    /** Mock input container- input data if it is not overridden. */
+    private Map<Object,List> mockInput = new TreeMap<>();
+
+    /** Context output implementation to write data into mockOutput. */
+    private GridHadoopTaskOutput output = new GridHadoopTaskOutput() {
+        /** {@inheritDoc} */
+        @Override public void write(Object key, Object val) {
+            //Check of casting and extract/copy values
+            String strKey = new String(((Text)key).getBytes());
+            int intVal = ((IntWritable)val).get();
+
+            mockOutput().add(new Pair<>(strKey, intVal));
+        }
+
+        /** {@inheritDoc} */
+        @Override public void close() {
+            throw new UnsupportedOperationException();
+        }
+    };
+
+    /** Context input implementation to read data from mockInput. */
+    private GridHadoopTaskInput input = new GridHadoopTaskInput() {
+        /** Iterator of keys and associated lists of values. */
+        Iterator<Map.Entry<Object, List>> iter;
+
+        /** Current key and associated value list. */
+        Map.Entry<Object, List> currEntry;
+
+        /** {@inheritDoc} */
+        @Override public boolean next() {
+            if (iter == null)
+                iter = mockInput().entrySet().iterator();
+
+            if (iter.hasNext())
+                currEntry = iter.next();
+            else
+                currEntry = null;
+
+            return currEntry != null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object key() {
+            return currEntry.getKey();
+        }
+
+        /** {@inheritDoc} */
+        @Override public Iterator<?> values() {
+            return currEntry.getValue().iterator() ;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void close() {
+            throw new UnsupportedOperationException();
+        }
+    };
+
+    /**
+     * Getter of mock output container - result of task if it is not 
overridden.
+     *
+     * @return mock output.
+     */
+    public List<Pair<String, Integer>> mockOutput() {
+        return mockOutput;
+    }
+
+    /**
+     * Getter of mock input container- input data if it is not overridden.
+     *
+     * @return mock output.
+     */
+    public Map<Object, List> mockInput() {
+        return mockInput;
+    }
+
+    /**
+     * Generate one-key-multiple-values tree from array of key-value pairs, 
and wrap its into Writable objects.
+     * The result is placed into mock input.
+     *
+     * @param flatData list of key-value pair.
+     */
+    public void makeTreeOfWritables(Iterable<Pair<String, Integer>> flatData) {
+        Text key = new Text();
+
+        for (HadoopTestTaskContext.Pair<String, Integer> pair : flatData) {
+            key.set(pair.key);
+            ArrayList<IntWritable> valList;
+
+            if (!mockInput.containsKey(key)) {
+                valList = new ArrayList<>();
+                mockInput.put(key, valList);
+                key = new Text();
+            }
+            else
+                valList = (ArrayList<IntWritable>) mockInput.get(key);
+            valList.add(new IntWritable(pair.value()));
+        }
+    }
+
+    /**
+     * @param taskInfo Task info.
+     * @param gridJob Grid Hadoop job.
+     */
+    public HadoopTestTaskContext(GridHadoopTaskInfo taskInfo, GridHadoopJob 
gridJob) throws IgniteCheckedException {
+        super(taskInfo, gridJob, gridJob.id(), null, 
jobConfDataInput(gridJob));
+    }
+
+    /**
+     * Creates DataInput to read JobConf.
+     *
+     * @param job Job.
+     * @return DataInput with JobConf.
+     * @throws IgniteCheckedException If failed.
+     */
+    private static DataInput jobConfDataInput(GridHadoopJob job) throws 
IgniteCheckedException {
+        JobConf jobConf = new JobConf();
+
+        for (Map.Entry<String, String> e : 
((HadoopDefaultJobInfo)job.info()).properties().entrySet())
+            jobConf.set(e.getKey(), e.getValue());
+
+        ByteArrayOutputStream buf = new ByteArrayOutputStream();
+
+        try {
+            jobConf.write(new DataOutputStream(buf));
+        }
+        catch (IOException e) {
+            throw new IgniteCheckedException(e);
+        }
+
+        return new DataInputStream(new 
ByteArrayInputStream(buf.toByteArray()));
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridHadoopTaskOutput output() {
+        return output;
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridHadoopTaskInput input() {
+        return input;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java
 
b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java
new file mode 100644
index 0000000..66e35b5
--- /dev/null
+++ 
b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.io.serializer.*;
+import org.apache.hadoop.mapred.*;
+import org.apache.ignite.*;
+import org.apache.ignite.internal.processors.hadoop.v2.*;
+
+import java.io.*;
+import java.util.*;
+
+import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*;
+
+/**
+ * Self test of {@link 
org.apache.ignite.internal.processors.hadoop.v2.HadoopV2Job}.
+ */
+public class HadoopV2JobSelfTest extends GridHadoopAbstractSelfTest {
+    /** */
+    private static final String TEST_SERIALIZED_VALUE = "Test serialized 
value";
+
+    /**
+     * Custom serialization class that accepts {@link Writable}.
+     */
+    private static class CustomSerialization extends WritableSerialization {
+        /** {@inheritDoc} */
+        @Override public Deserializer<Writable> 
getDeserializer(Class<Writable> c) {
+            return new Deserializer<Writable>() {
+                @Override public void open(InputStream in) { }
+
+                @Override public Writable deserialize(Writable writable) {
+                    return new Text(TEST_SERIALIZED_VALUE);
+                }
+
+                @Override public void close() { }
+            };
+        }
+    }
+
+    /**
+     * Tests that {@link GridHadoopJob} provides wrapped serializer if it's 
set in configuration.
+     *
+     * @throws IgniteCheckedException If fails.
+     */
+    public void testCustomSerializationApplying() throws 
IgniteCheckedException {
+        JobConf cfg = new JobConf();
+
+        cfg.setMapOutputKeyClass(IntWritable.class);
+        cfg.setMapOutputValueClass(Text.class);
+        cfg.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, 
CustomSerialization.class.getName());
+
+        GridHadoopJob job = new HadoopV2Job(new 
GridHadoopJobId(UUID.randomUUID(), 1), createJobInfo(cfg), log);
+
+        GridHadoopTaskContext taskCtx = job.getTaskContext(new 
GridHadoopTaskInfo(GridHadoopTaskType.MAP, null, 0, 0,
+            null));
+
+        GridHadoopSerialization ser = taskCtx.keySerialization();
+
+        assertEquals(HadoopSerializationWrapper.class.getName(), 
ser.getClass().getName());
+
+        DataInput in = new DataInputStream(new ByteArrayInputStream(new 
byte[0]));
+
+        assertEquals(TEST_SERIALIZED_VALUE, ser.read(in, null).toString());
+
+        ser = taskCtx.valueSerialization();
+
+        assertEquals(HadoopSerializationWrapper.class.getName(), 
ser.getClass().getName());
+
+        assertEquals(TEST_SERIALIZED_VALUE, ser.read(in, null).toString());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopAbstractMapTest.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopAbstractMapTest.java
 
b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopAbstractMapTest.java
index 716fe19..aa0ddc1 100644
--- 
a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopAbstractMapTest.java
+++ 
b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopAbstractMapTest.java
@@ -60,12 +60,12 @@ public abstract class GridHadoopAbstractMapTest extends 
GridCommonAbstractTest {
 
         /** {@inheritDoc} */
         @Override public GridHadoopSerialization keySerialization() throws 
IgniteCheckedException {
-            return new GridHadoopWritableSerialization(IntWritable.class);
+            return new HadoopWritableSerialization(IntWritable.class);
         }
 
         /** {@inheritDoc} */
         @Override public GridHadoopSerialization valueSerialization() throws 
IgniteCheckedException {
-            return new GridHadoopWritableSerialization(IntWritable.class);
+            return new HadoopWritableSerialization(IntWritable.class);
         }
 
         /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/GridHadoopExternalCommunicationSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/GridHadoopExternalCommunicationSelfTest.java
 
b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/GridHadoopExternalCommunicationSelfTest.java
deleted file mode 100644
index dd3c5d4..0000000
--- 
a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/GridHadoopExternalCommunicationSelfTest.java
+++ /dev/null
@@ -1,209 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package 
org.apache.ignite.internal.processors.hadoop.taskexecutor.external.communication;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.processors.hadoop.message.*;
-import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.marshaller.*;
-import org.apache.ignite.marshaller.optimized.*;
-import org.apache.ignite.testframework.junits.common.*;
-
-import java.io.*;
-import java.util.*;
-import java.util.concurrent.*;
-
-/**
- * Tests Hadoop external communication component.
- */
-public class GridHadoopExternalCommunicationSelfTest extends 
GridCommonAbstractTest {
-    /**
-     * @throws Exception If failed.
-     */
-    public void testSimpleMessageSendingTcp() throws Exception {
-        checkSimpleMessageSending(false);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testSimpleMessageSendingShmem() throws Exception {
-        checkSimpleMessageSending(true);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    private void checkSimpleMessageSending(boolean useShmem) throws Exception {
-        UUID parentNodeId = UUID.randomUUID();
-
-        Marshaller marsh = new OptimizedMarshaller();
-
-        IgniteLogger log = log();
-
-        GridHadoopExternalCommunication[] comms = new 
GridHadoopExternalCommunication[4];
-
-        try {
-            String name = "grid";
-
-            TestHadoopListener[] lsnrs = new TestHadoopListener[4];
-
-            int msgs = 10;
-
-            for (int i = 0; i < comms.length; i++) {
-                comms[i] = new GridHadoopExternalCommunication(parentNodeId, 
UUID.randomUUID(), marsh, log,
-                    Executors.newFixedThreadPool(1), name + i);
-
-                if (useShmem)
-                    comms[i].setSharedMemoryPort(14000);
-
-                lsnrs[i] = new TestHadoopListener(msgs);
-
-                comms[i].setListener(lsnrs[i]);
-
-                comms[i].start();
-            }
-
-            for (int r = 0; r < msgs; r++) {
-                for (int from = 0; from < comms.length; from++) {
-                    for (int to = 0; to < comms.length; to++) {
-                        if (from == to)
-                            continue;
-
-                        
comms[from].sendMessage(comms[to].localProcessDescriptor(), new 
TestMessage(from, to));
-                    }
-                }
-            }
-
-            U.sleep(1000);
-
-            for (TestHadoopListener lsnr : lsnrs) {
-                lsnr.await(3_000);
-
-                assertEquals(String.valueOf(lsnr.messages()), msgs * 
(comms.length - 1), lsnr.messages().size());
-            }
-        }
-        finally {
-            for (GridHadoopExternalCommunication comm : comms) {
-                if (comm != null)
-                    comm.stop();
-            }
-        }
-    }
-
-    /**
-     *
-     */
-    private static class TestHadoopListener implements 
GridHadoopMessageListener {
-        /** Received messages (array list is safe because executor has one 
thread). */
-        private Collection<TestMessage> msgs = new ArrayList<>();
-
-        /** Await latch. */
-        private CountDownLatch receiveLatch;
-
-        /**
-         * @param msgs Number of messages to await.
-         */
-        private TestHadoopListener(int msgs) {
-            receiveLatch = new CountDownLatch(msgs);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void onMessageReceived(HadoopProcessDescriptor desc, 
HadoopMessage msg) {
-            assert msg instanceof TestMessage;
-
-            msgs.add((TestMessage)msg);
-
-            receiveLatch.countDown();
-        }
-
-        /** {@inheritDoc} */
-        @Override public void onConnectionLost(HadoopProcessDescriptor desc) {
-            // No-op.
-        }
-
-        /**
-         * @return Received messages.
-         */
-        public Collection<TestMessage> messages() {
-            return msgs;
-        }
-
-        /**
-         * @param millis Time to await.
-         * @throws InterruptedException If wait interrupted.
-         */
-        public void await(int millis) throws InterruptedException {
-            receiveLatch.await(millis, TimeUnit.MILLISECONDS);
-        }
-    }
-
-    /**
-     *
-     */
-    private static class TestMessage implements HadoopMessage {
-        /** From index. */
-        private int from;
-
-        /** To index. */
-        private int to;
-
-        /**
-         * @param from From index.
-         * @param to To index.
-         */
-        private TestMessage(int from, int to) {
-            this.from = from;
-            this.to = to;
-        }
-
-        /**
-         * Required by {@link Externalizable}.
-         */
-        public TestMessage() {
-            // No-op.
-        }
-
-        /**
-         * @return From index.
-         */
-        public int from() {
-            return from;
-        }
-
-        /**
-         * @return To index.
-         */
-        public int to() {
-            return to;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void writeExternal(ObjectOutput out) throws 
IOException {
-            out.writeInt(from);
-            out.writeInt(to);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
-            from = in.readInt();
-            to = in.readInt();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunicationSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunicationSelfTest.java
 
b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunicationSelfTest.java
new file mode 100644
index 0000000..a21633d
--- /dev/null
+++ 
b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunicationSelfTest.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.ignite.internal.processors.hadoop.taskexecutor.external.communication;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.processors.hadoop.message.*;
+import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.marshaller.*;
+import org.apache.ignite.marshaller.optimized.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.*;
+
+/**
+ * Tests Hadoop external communication component.
+ */
+public class HadoopExternalCommunicationSelfTest extends 
GridCommonAbstractTest {
+    /**
+     * @throws Exception If failed.
+     */
+    public void testSimpleMessageSendingTcp() throws Exception {
+        checkSimpleMessageSending(false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testSimpleMessageSendingShmem() throws Exception {
+        checkSimpleMessageSending(true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void checkSimpleMessageSending(boolean useShmem) throws Exception {
+        UUID parentNodeId = UUID.randomUUID();
+
+        Marshaller marsh = new OptimizedMarshaller();
+
+        IgniteLogger log = log();
+
+        HadoopExternalCommunication[] comms = new 
HadoopExternalCommunication[4];
+
+        try {
+            String name = "grid";
+
+            TestHadoopListener[] lsnrs = new TestHadoopListener[4];
+
+            int msgs = 10;
+
+            for (int i = 0; i < comms.length; i++) {
+                comms[i] = new HadoopExternalCommunication(parentNodeId, 
UUID.randomUUID(), marsh, log,
+                    Executors.newFixedThreadPool(1), name + i);
+
+                if (useShmem)
+                    comms[i].setSharedMemoryPort(14000);
+
+                lsnrs[i] = new TestHadoopListener(msgs);
+
+                comms[i].setListener(lsnrs[i]);
+
+                comms[i].start();
+            }
+
+            for (int r = 0; r < msgs; r++) {
+                for (int from = 0; from < comms.length; from++) {
+                    for (int to = 0; to < comms.length; to++) {
+                        if (from == to)
+                            continue;
+
+                        
comms[from].sendMessage(comms[to].localProcessDescriptor(), new 
TestMessage(from, to));
+                    }
+                }
+            }
+
+            U.sleep(1000);
+
+            for (TestHadoopListener lsnr : lsnrs) {
+                lsnr.await(3_000);
+
+                assertEquals(String.valueOf(lsnr.messages()), msgs * 
(comms.length - 1), lsnr.messages().size());
+            }
+        }
+        finally {
+            for (HadoopExternalCommunication comm : comms) {
+                if (comm != null)
+                    comm.stop();
+            }
+        }
+    }
+
+    /**
+     *
+     */
+    private static class TestHadoopListener implements HadoopMessageListener {
+        /** Received messages (array list is safe because executor has one 
thread). */
+        private Collection<TestMessage> msgs = new ArrayList<>();
+
+        /** Await latch. */
+        private CountDownLatch receiveLatch;
+
+        /**
+         * @param msgs Number of messages to await.
+         */
+        private TestHadoopListener(int msgs) {
+            receiveLatch = new CountDownLatch(msgs);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onMessageReceived(HadoopProcessDescriptor desc, 
HadoopMessage msg) {
+            assert msg instanceof TestMessage;
+
+            msgs.add((TestMessage)msg);
+
+            receiveLatch.countDown();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onConnectionLost(HadoopProcessDescriptor desc) {
+            // No-op.
+        }
+
+        /**
+         * @return Received messages.
+         */
+        public Collection<TestMessage> messages() {
+            return msgs;
+        }
+
+        /**
+         * @param millis Time to await.
+         * @throws InterruptedException If wait interrupted.
+         */
+        public void await(int millis) throws InterruptedException {
+            receiveLatch.await(millis, TimeUnit.MILLISECONDS);
+        }
+    }
+
+    /**
+     *
+     */
+    private static class TestMessage implements HadoopMessage {
+        /** From index. */
+        private int from;
+
+        /** To index. */
+        private int to;
+
+        /**
+         * @param from From index.
+         * @param to To index.
+         */
+        private TestMessage(int from, int to) {
+            this.from = from;
+            this.to = to;
+        }
+
+        /**
+         * Required by {@link Externalizable}.
+         */
+        public TestMessage() {
+            // No-op.
+        }
+
+        /**
+         * @return From index.
+         */
+        public int from() {
+            return from;
+        }
+
+        /**
+         * @return To index.
+         */
+        public int to() {
+            return to;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void writeExternal(ObjectOutput out) throws 
IOException {
+            out.writeInt(from);
+            out.writeInt(to);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
+            from = in.readInt();
+            to = in.readInt();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
 
b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
index 822ab8f..1413c7e 100644
--- 
a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
+++ 
b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
@@ -94,10 +94,10 @@ public class IgniteHadoopTestSuite extends TestSuite {
 
         suite.addTest(new 
TestSuite(ldr.loadClass(GridHadoopTaskExecutionSelfTest.class.getName())));
 
-        suite.addTest(new 
TestSuite(ldr.loadClass(GridHadoopV2JobSelfTest.class.getName())));
+        suite.addTest(new 
TestSuite(ldr.loadClass(HadoopV2JobSelfTest.class.getName())));
 
-        suite.addTest(new 
TestSuite(ldr.loadClass(GridHadoopSerializationWrapperSelfTest.class.getName())));
-        suite.addTest(new 
TestSuite(ldr.loadClass(GridHadoopSplitWrapperSelfTest.class.getName())));
+        suite.addTest(new 
TestSuite(ldr.loadClass(HadoopSerializationWrapperSelfTest.class.getName())));
+        suite.addTest(new 
TestSuite(ldr.loadClass(HadoopSplitWrapperSelfTest.class.getName())));
 
         suite.addTest(new 
TestSuite(ldr.loadClass(GridHadoopTasksV1Test.class.getName())));
         suite.addTest(new 
TestSuite(ldr.loadClass(GridHadoopTasksV2Test.class.getName())));
@@ -107,7 +107,7 @@ public class IgniteHadoopTestSuite extends TestSuite {
         suite.addTest(new 
TestSuite(ldr.loadClass(GridHadoopMapReduceEmbeddedSelfTest.class.getName())));
 
         suite.addTest(new 
TestSuite(ldr.loadClass(GridHadoopExternalTaskExecutionSelfTest.class.getName())));
-        suite.addTest(new 
TestSuite(ldr.loadClass(GridHadoopExternalCommunicationSelfTest.class.getName())));
+        suite.addTest(new 
TestSuite(ldr.loadClass(HadoopExternalCommunicationSelfTest.class.getName())));
 
         suite.addTest(new 
TestSuite(ldr.loadClass(GridHadoopSortingTest.class.getName())));
 

Reply via email to