http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopValidationSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopValidationSelfTest.java
 
b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopValidationSelfTest.java
new file mode 100644
index 0000000..558dec5
--- /dev/null
+++ 
b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopValidationSelfTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import org.apache.ignite.configuration.*;
+
+/**
+ * Configuration validation tests.
+ */
+public class HadoopValidationSelfTest extends HadoopAbstractSelfTest {
+    /** Peer class loading enabled flag. */
+    public boolean peerClassLoading;
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids(true);
+
+        peerClassLoading = false;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.setPeerClassLoadingEnabled(peerClassLoading);
+
+        return cfg;
+    }
+
+    /**
+     * Ensure that Grid starts when all configuration parameters are valid.
+     *
+     * @throws Exception If failed.
+     */
+    public void testValid() throws Exception {
+        startGrids(1);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/GridHadoopWordCount1.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/GridHadoopWordCount1.java
 
b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/GridHadoopWordCount1.java
deleted file mode 100644
index 40cf636..0000000
--- 
a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/GridHadoopWordCount1.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.examples;
-
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.io.*;
-import org.apache.hadoop.mapred.*;
-
-/**
- * Example job for testing hadoop task execution.
- */
-public class GridHadoopWordCount1 {
-    /**
-     * Entry point to start job.
-     * @param args command line parameters.
-     * @throws Exception if fails.
-     */
-    public static void main(String[] args) throws Exception {
-        if (args.length != 2) {
-            System.out.println("usage: [input] [output]");
-            System.exit(-1);
-        }
-
-        JobConf job = getJob(args[0], args[1]);
-
-        JobClient.runJob(job);
-    }
-
-    /**
-     * Gets fully configured JobConf instance.
-     *
-     * @param input input file name.
-     * @param output output directory name.
-     * @return Job configuration
-     */
-    public static JobConf getJob(String input, String output) {
-        JobConf conf = new JobConf(GridHadoopWordCount1.class);
-        conf.setJobName("wordcount");
-
-        conf.setOutputKeyClass(Text.class);
-        conf.setOutputValueClass(IntWritable.class);
-
-        setTasksClasses(conf, true, true, true);
-
-        FileInputFormat.setInputPaths(conf, new Path(input));
-        FileOutputFormat.setOutputPath(conf, new Path(output));
-
-        return conf;
-    }
-
-    /**
-     * Sets task classes with related info if needed into configuration object.
-     *
-     * @param jobConf Configuration to change.
-     * @param setMapper Option to set mapper and input format classes.
-     * @param setCombiner Option to set combiner class.
-     * @param setReducer Option to set reducer and output format classes.
-     */
-    public static void setTasksClasses(JobConf jobConf, boolean setMapper, 
boolean setCombiner, boolean setReducer) {
-        if (setMapper) {
-            jobConf.setMapperClass(GridHadoopWordCount1Map.class);
-            jobConf.setInputFormat(TextInputFormat.class);
-        }
-
-        if (setCombiner)
-            jobConf.setCombinerClass(GridHadoopWordCount1Reduce.class);
-
-        if (setReducer) {
-            jobConf.setReducerClass(GridHadoopWordCount1Reduce.class);
-            jobConf.setOutputFormat(TextOutputFormat.class);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/GridHadoopWordCount1Map.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/GridHadoopWordCount1Map.java
 
b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/GridHadoopWordCount1Map.java
deleted file mode 100644
index 5d8e0cc..0000000
--- 
a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/GridHadoopWordCount1Map.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.examples;
-
-import org.apache.hadoop.io.*;
-import org.apache.hadoop.mapred.*;
-
-import java.io.*;
-import java.util.*;
-
-/**
- * Mapper phase of WordCount job.
- */
-public class GridHadoopWordCount1Map extends MapReduceBase implements 
Mapper<LongWritable, Text, Text, IntWritable> {
-    /** Writable integer constant of '1' is writing as count of found words. */
-    private static final IntWritable one = new IntWritable(1);
-
-    /** Writable container for writing word. */
-    private Text word = new Text();
-
-    /** Flag is to check that mapper was configured before run. */
-    private boolean wasConfigured;
-
-    /** {@inheritDoc} */
-    @Override public void map(LongWritable key, Text val, 
OutputCollector<Text, IntWritable> output, Reporter reporter)
-            throws IOException {
-
-        assert wasConfigured : "Mapper should be configured";
-
-        String line = val.toString();
-
-        StringTokenizer tokenizer = new StringTokenizer(line);
-
-        while (tokenizer.hasMoreTokens()) {
-            word.set(tokenizer.nextToken());
-
-            output.collect(word, one);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void configure(JobConf job) {
-        super.configure(job);
-
-        wasConfigured = true;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/GridHadoopWordCount1Reduce.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/GridHadoopWordCount1Reduce.java
 
b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/GridHadoopWordCount1Reduce.java
deleted file mode 100644
index 1b69a43..0000000
--- 
a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/GridHadoopWordCount1Reduce.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.examples;
-
-import org.apache.hadoop.io.*;
-import org.apache.hadoop.mapred.*;
-
-import java.io.*;
-import java.util.*;
-
-/**
- * Combiner and Reducer phase of WordCount job.
- */
-public class GridHadoopWordCount1Reduce extends MapReduceBase implements 
Reducer<Text, IntWritable, Text, IntWritable> {
-    /** Flag is to check that mapper was configured before run. */
-    private boolean wasConfigured;
-
-    /** {@inheritDoc} */
-    @Override public void reduce(Text key, Iterator<IntWritable> values, 
OutputCollector<Text, IntWritable> output, Reporter reporter)
-            throws IOException {
-        assert wasConfigured : "Reducer should be configured";
-
-        int sum = 0;
-
-        while (values.hasNext())
-            sum += values.next().get();
-
-        output.collect(key, new IntWritable(sum));
-    }
-
-    @Override public void configure(JobConf job) {
-        super.configure(job);
-
-        wasConfigured = true;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/GridHadoopWordCount2.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/GridHadoopWordCount2.java
 
b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/GridHadoopWordCount2.java
deleted file mode 100644
index 6310363..0000000
--- 
a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/GridHadoopWordCount2.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.examples;
-
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.io.*;
-import org.apache.hadoop.mapreduce.*;
-import org.apache.hadoop.mapreduce.lib.input.*;
-import org.apache.hadoop.mapreduce.lib.output.*;
-
-import java.io.*;
-
-/**
- * Example job for testing hadoop task execution.
- */
-public class GridHadoopWordCount2 {
-    /**
-     * Entry point to start job.
-     *
-     * @param args Command line parameters.
-     * @throws Exception If fails.
-     */
-    public static void main(String[] args) throws Exception {
-        if (args.length != 2) {
-            System.out.println("usage: [input] [output]");
-            System.exit(-1);
-        }
-
-        Job job = getJob(args[0], args[1]);
-
-        job.submit();
-    }
-
-    /**
-     * Gets fully configured Job instance.
-     *
-     * @param input Input file name.
-     * @param output Output directory name.
-     * @return Job instance.
-     * @throws IOException If fails.
-     */
-    public static Job getJob(String input, String output) throws IOException {
-        Job job = Job.getInstance();
-
-        job.setOutputKeyClass(Text.class);
-        job.setOutputValueClass(IntWritable.class);
-
-        setTasksClasses(job, true, true, true);
-
-        FileInputFormat.setInputPaths(job, new Path(input));
-        FileOutputFormat.setOutputPath(job, new Path(output));
-
-        job.setJarByClass(GridHadoopWordCount2.class);
-
-        return job;
-    }
-
-    /**
-     * Sets task classes with related info if needed into configuration object.
-     *
-     * @param job Configuration to change.
-     * @param setMapper Option to set mapper and input format classes.
-     * @param setCombiner Option to set combiner class.
-     * @param setReducer Option to set reducer and output format classes.
-     */
-    public static void setTasksClasses(Job job, boolean setMapper, boolean 
setCombiner, boolean setReducer) {
-        if (setMapper) {
-            job.setMapperClass(GridHadoopWordCount2Mapper.class);
-            job.setInputFormatClass(TextInputFormat.class);
-        }
-
-        if (setCombiner)
-            job.setCombinerClass(GridHadoopWordCount2Reducer.class);
-
-        if (setReducer) {
-            job.setReducerClass(GridHadoopWordCount2Reducer.class);
-            job.setOutputFormatClass(TextOutputFormat.class);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/GridHadoopWordCount2Mapper.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/GridHadoopWordCount2Mapper.java
 
b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/GridHadoopWordCount2Mapper.java
deleted file mode 100644
index 849928a..0000000
--- 
a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/GridHadoopWordCount2Mapper.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.examples;
-
-import org.apache.hadoop.conf.*;
-import org.apache.hadoop.io.*;
-import org.apache.hadoop.mapreduce.*;
-
-import java.io.*;
-import java.util.*;
-
-/**
- * Mapper phase of WordCount job.
- */
-public class GridHadoopWordCount2Mapper extends Mapper<Object, Text, Text, 
IntWritable> implements Configurable {
-    /** Writable container for writing word. */
-    private Text word = new Text();
-
-    /** Writable integer constant of '1' is writing as count of found words. */
-    private static final IntWritable one = new IntWritable(1);
-
-    /** Flag is to check that mapper was configured before run. */
-    private boolean wasConfigured;
-
-    /** Flag is to check that mapper was set up before run. */
-    private boolean wasSetUp;
-
-    /** {@inheritDoc} */
-    @Override public void map(Object key, Text val, Context ctx) throws 
IOException, InterruptedException {
-        assert wasConfigured : "Mapper should be configured";
-        assert wasSetUp : "Mapper should be set up";
-
-        StringTokenizer wordList = new StringTokenizer(val.toString());
-
-        while (wordList.hasMoreTokens()) {
-            word.set(wordList.nextToken());
-
-            ctx.write(word, one);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void setup(Context context) throws IOException, 
InterruptedException {
-        super.setup(context);
-        wasSetUp = true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setConf(Configuration conf) {
-        wasConfigured = true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public Configuration getConf() {
-        return null;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/GridHadoopWordCount2Reducer.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/GridHadoopWordCount2Reducer.java
 
b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/GridHadoopWordCount2Reducer.java
deleted file mode 100644
index 922bb2f..0000000
--- 
a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/GridHadoopWordCount2Reducer.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.examples;
-
-import org.apache.hadoop.conf.*;
-import org.apache.hadoop.io.*;
-import org.apache.hadoop.mapreduce.*;
-
-import java.io.*;
-
-/**
- * Combiner and Reducer phase of WordCount job.
- */
-public class GridHadoopWordCount2Reducer extends Reducer<Text, IntWritable, 
Text, IntWritable> implements Configurable {
-    /** Writable container for writing sum of word counts. */
-    private IntWritable totalWordCnt = new IntWritable();
-
-    /** Flag is to check that mapper was configured before run. */
-    private boolean wasConfigured;
-
-    /** Flag is to check that mapper was set up before run. */
-    private boolean wasSetUp;
-
-    /** {@inheritDoc} */
-    @Override public void reduce(Text key, Iterable<IntWritable> values, 
Context ctx) throws IOException, InterruptedException {
-        assert wasConfigured : "Reducer should be configured";
-        assert wasSetUp : "Reducer should be set up";
-
-        int wordCnt = 0;
-
-        for (IntWritable value : values)
-            wordCnt += value.get();
-
-        totalWordCnt.set(wordCnt);
-
-        ctx.write(key, totalWordCnt);
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void setup(Context context) throws IOException, 
InterruptedException {
-        super.setup(context);
-        wasSetUp = true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setConf(Configuration conf) {
-        wasConfigured = true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public Configuration getConf() {
-        return null;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount1.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount1.java
 
b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount1.java
new file mode 100644
index 0000000..dd9058d
--- /dev/null
+++ 
b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount1.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.examples;
+
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.mapred.*;
+
+/**
+ * Example job for testing hadoop task execution.
+ */
+public class HadoopWordCount1 {
+    /**
+     * Entry point to start job.
+     * @param args command line parameters.
+     * @throws Exception if fails.
+     */
+    public static void main(String[] args) throws Exception {
+        if (args.length != 2) {
+            System.out.println("usage: [input] [output]");
+            System.exit(-1);
+        }
+
+        JobConf job = getJob(args[0], args[1]);
+
+        JobClient.runJob(job);
+    }
+
+    /**
+     * Gets fully configured JobConf instance.
+     *
+     * @param input input file name.
+     * @param output output directory name.
+     * @return Job configuration
+     */
+    public static JobConf getJob(String input, String output) {
+        JobConf conf = new JobConf(HadoopWordCount1.class);
+        conf.setJobName("wordcount");
+
+        conf.setOutputKeyClass(Text.class);
+        conf.setOutputValueClass(IntWritable.class);
+
+        setTasksClasses(conf, true, true, true);
+
+        FileInputFormat.setInputPaths(conf, new Path(input));
+        FileOutputFormat.setOutputPath(conf, new Path(output));
+
+        return conf;
+    }
+
+    /**
+     * Sets task classes with related info if needed into configuration object.
+     *
+     * @param jobConf Configuration to change.
+     * @param setMapper Option to set mapper and input format classes.
+     * @param setCombiner Option to set combiner class.
+     * @param setReducer Option to set reducer and output format classes.
+     */
+    public static void setTasksClasses(JobConf jobConf, boolean setMapper, 
boolean setCombiner, boolean setReducer) {
+        if (setMapper) {
+            jobConf.setMapperClass(HadoopWordCount1Map.class);
+            jobConf.setInputFormat(TextInputFormat.class);
+        }
+
+        if (setCombiner)
+            jobConf.setCombinerClass(HadoopWordCount1Reduce.class);
+
+        if (setReducer) {
+            jobConf.setReducerClass(HadoopWordCount1Reduce.class);
+            jobConf.setOutputFormat(TextOutputFormat.class);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount1Map.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount1Map.java
 
b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount1Map.java
new file mode 100644
index 0000000..c10a7fb
--- /dev/null
+++ 
b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount1Map.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.examples;
+
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.mapred.*;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * Mapper phase of WordCount job.
+ */
+public class HadoopWordCount1Map extends MapReduceBase implements 
Mapper<LongWritable, Text, Text, IntWritable> {
+    /** Writable integer constant of '1' is writing as count of found words. */
+    private static final IntWritable one = new IntWritable(1);
+
+    /** Writable container for writing word. */
+    private Text word = new Text();
+
+    /** Flag is to check that mapper was configured before run. */
+    private boolean wasConfigured;
+
+    /** {@inheritDoc} */
+    @Override public void map(LongWritable key, Text val, 
OutputCollector<Text, IntWritable> output, Reporter reporter)
+            throws IOException {
+
+        assert wasConfigured : "Mapper should be configured";
+
+        String line = val.toString();
+
+        StringTokenizer tokenizer = new StringTokenizer(line);
+
+        while (tokenizer.hasMoreTokens()) {
+            word.set(tokenizer.nextToken());
+
+            output.collect(word, one);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void configure(JobConf job) {
+        super.configure(job);
+
+        wasConfigured = true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount1Reduce.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount1Reduce.java
 
b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount1Reduce.java
new file mode 100644
index 0000000..76cd1c3
--- /dev/null
+++ 
b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount1Reduce.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.examples;
+
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.mapred.*;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * Combiner and Reducer phase of WordCount job.
+ */
+public class HadoopWordCount1Reduce extends MapReduceBase implements 
Reducer<Text, IntWritable, Text, IntWritable> {
+    /** Flag is to check that mapper was configured before run. */
+    private boolean wasConfigured;
+
+    /** {@inheritDoc} */
+    @Override public void reduce(Text key, Iterator<IntWritable> values, 
OutputCollector<Text, IntWritable> output, Reporter reporter)
+            throws IOException {
+        assert wasConfigured : "Reducer should be configured";
+
+        int sum = 0;
+
+        while (values.hasNext())
+            sum += values.next().get();
+
+        output.collect(key, new IntWritable(sum));
+    }
+
+    @Override public void configure(JobConf job) {
+        super.configure(job);
+
+        wasConfigured = true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2.java
 
b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2.java
new file mode 100644
index 0000000..dc68df7
--- /dev/null
+++ 
b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.examples;
+
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.lib.input.*;
+import org.apache.hadoop.mapreduce.lib.output.*;
+
+import java.io.*;
+
+/**
+ * Example job for testing hadoop task execution.
+ */
+public class HadoopWordCount2 {
+    /**
+     * Entry point to start job.
+     *
+     * @param args Command line parameters.
+     * @throws Exception If fails.
+     */
+    public static void main(String[] args) throws Exception {
+        if (args.length != 2) {
+            System.out.println("usage: [input] [output]");
+            System.exit(-1);
+        }
+
+        Job job = getJob(args[0], args[1]);
+
+        job.submit();
+    }
+
+    /**
+     * Gets fully configured Job instance.
+     *
+     * @param input Input file name.
+     * @param output Output directory name.
+     * @return Job instance.
+     * @throws IOException If fails.
+     */
+    public static Job getJob(String input, String output) throws IOException {
+        Job job = Job.getInstance();
+
+        job.setOutputKeyClass(Text.class);
+        job.setOutputValueClass(IntWritable.class);
+
+        setTasksClasses(job, true, true, true);
+
+        FileInputFormat.setInputPaths(job, new Path(input));
+        FileOutputFormat.setOutputPath(job, new Path(output));
+
+        job.setJarByClass(HadoopWordCount2.class);
+
+        return job;
+    }
+
+    /**
+     * Sets task classes with related info if needed into configuration object.
+     *
+     * @param job Configuration to change.
+     * @param setMapper Option to set mapper and input format classes.
+     * @param setCombiner Option to set combiner class.
+     * @param setReducer Option to set reducer and output format classes.
+     */
+    public static void setTasksClasses(Job job, boolean setMapper, boolean 
setCombiner, boolean setReducer) {
+        if (setMapper) {
+            job.setMapperClass(HadoopWordCount2Mapper.class);
+            job.setInputFormatClass(TextInputFormat.class);
+        }
+
+        if (setCombiner)
+            job.setCombinerClass(HadoopWordCount2Reducer.class);
+
+        if (setReducer) {
+            job.setReducerClass(HadoopWordCount2Reducer.class);
+            job.setOutputFormatClass(TextOutputFormat.class);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2Mapper.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2Mapper.java
 
b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2Mapper.java
new file mode 100644
index 0000000..6ca7ccd
--- /dev/null
+++ 
b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2Mapper.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.examples;
+
+import org.apache.hadoop.conf.*;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.mapreduce.*;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * Mapper phase of WordCount job.
+ */
+public class HadoopWordCount2Mapper extends Mapper<Object, Text, Text, 
IntWritable> implements Configurable {
+    /** Writable container for writing word. */
+    private Text word = new Text();
+
+    /** Writable integer constant of '1' is writing as count of found words. */
+    private static final IntWritable one = new IntWritable(1);
+
+    /** Flag is to check that mapper was configured before run. */
+    private boolean wasConfigured;
+
+    /** Flag is to check that mapper was set up before run. */
+    private boolean wasSetUp;
+
+    /** {@inheritDoc} */
+    @Override public void map(Object key, Text val, Context ctx) throws 
IOException, InterruptedException {
+        assert wasConfigured : "Mapper should be configured";
+        assert wasSetUp : "Mapper should be set up";
+
+        StringTokenizer wordList = new StringTokenizer(val.toString());
+
+        while (wordList.hasMoreTokens()) {
+            word.set(wordList.nextToken());
+
+            ctx.write(word, one);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void setup(Context context) throws IOException, 
InterruptedException {
+        super.setup(context);
+        wasSetUp = true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setConf(Configuration conf) {
+        wasConfigured = true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Configuration getConf() {
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2Reducer.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2Reducer.java
 
b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2Reducer.java
new file mode 100644
index 0000000..fedaaf9
--- /dev/null
+++ 
b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2Reducer.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.examples;
+
+import org.apache.hadoop.conf.*;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.mapreduce.*;
+
+import java.io.*;
+
+/**
+ * Combiner and Reducer phase of WordCount job.
+ */
+public class HadoopWordCount2Reducer extends Reducer<Text, IntWritable, Text, 
IntWritable> implements Configurable {
+    /** Writable container for writing sum of word counts. */
+    private IntWritable totalWordCnt = new IntWritable();
+
+    /** Flag is to check that mapper was configured before run. */
+    private boolean wasConfigured;
+
+    /** Flag is to check that mapper was set up before run. */
+    private boolean wasSetUp;
+
+    /** {@inheritDoc} */
+    @Override public void reduce(Text key, Iterable<IntWritable> values, 
Context ctx) throws IOException, InterruptedException {
+        assert wasConfigured : "Reducer should be configured";
+        assert wasSetUp : "Reducer should be set up";
+
+        int wordCnt = 0;
+
+        for (IntWritable value : values)
+            wordCnt += value.get();
+
+        totalWordCnt.set(wordCnt);
+
+        ctx.write(key, totalWordCnt);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void setup(Context context) throws IOException, 
InterruptedException {
+        super.setup(context);
+        wasSetUp = true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setConf(Configuration conf) {
+        wasConfigured = true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Configuration getConf() {
+        return null;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopAbstractMapTest.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopAbstractMapTest.java
 
b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopAbstractMapTest.java
deleted file mode 100644
index 716fe19..0000000
--- 
a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopAbstractMapTest.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.shuffle.collections;
-
-import org.apache.commons.collections.comparators.*;
-import org.apache.hadoop.io.*;
-import org.apache.ignite.*;
-import org.apache.ignite.internal.processors.hadoop.*;
-import org.apache.ignite.internal.processors.hadoop.v2.*;
-import org.apache.ignite.testframework.junits.common.*;
-import org.jetbrains.annotations.*;
-
-import java.util.*;
-
-/**
- * Abstract class for maps test.
- */
-public abstract class GridHadoopAbstractMapTest extends GridCommonAbstractTest 
{
-    /**
-     * Test task context.
-     */
-    protected static class TaskContext extends GridHadoopTaskContext {
-        /**
-         */
-        protected TaskContext() {
-            super(null, null);
-        }
-
-        /** {@inheritDoc} */
-        @Override public <T extends GridHadoopCounter> T counter(String grp, 
String name, Class<T> cls) {
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public GridHadoopCounters counters() {
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public GridHadoopPartitioner partitioner() throws 
IgniteCheckedException {
-            assert false;
-
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public GridHadoopSerialization keySerialization() throws 
IgniteCheckedException {
-            return new GridHadoopWritableSerialization(IntWritable.class);
-        }
-
-        /** {@inheritDoc} */
-        @Override public GridHadoopSerialization valueSerialization() throws 
IgniteCheckedException {
-            return new GridHadoopWritableSerialization(IntWritable.class);
-        }
-
-        /** {@inheritDoc} */
-        @Override public Comparator<Object> sortComparator() {
-            return ComparableComparator.getInstance();
-        }
-
-        /** {@inheritDoc} */
-        @Override public Comparator<Object> groupComparator() {
-            return ComparableComparator.getInstance();
-        }
-
-        /** {@inheritDoc} */
-        @Override public void run() throws IgniteCheckedException {
-            assert false;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void cancel() {
-            assert false;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void prepareTaskEnvironment() throws 
IgniteCheckedException {
-            assert false;
-        }
-
-        @Override public void cleanupTaskEnvironment() throws 
IgniteCheckedException {
-            assert false;
-        }
-    }
-
-    /**
-     * Test job info.
-     */
-    protected static class JobInfo implements GridHadoopJobInfo {
-        /** {@inheritDoc} */
-        @Nullable @Override public String property(String name) {
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean hasCombiner() {
-            assert false;
-
-            return false;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean hasReducer() {
-            assert false;
-
-            return false;
-        }
-
-        /** {@inheritDoc} */
-        @Override public GridHadoopJob createJob(GridHadoopJobId jobId, 
IgniteLogger log) throws IgniteCheckedException {
-            assert false;
-
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public int reducers() {
-            assert false;
-
-            return 0;
-        }
-
-        /** {@inheritDoc} */
-        @Override public String jobName() {
-            assert false;
-
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public String user() {
-            assert false;
-
-            return null;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopConcurrentHashMultimapSelftest.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopConcurrentHashMultimapSelftest.java
 
b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopConcurrentHashMultimapSelftest.java
deleted file mode 100644
index 88dfd2b..0000000
--- 
a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopConcurrentHashMultimapSelftest.java
+++ /dev/null
@@ -1,267 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.shuffle.collections;
-
-import com.google.common.collect.*;
-import org.apache.hadoop.io.*;
-import org.apache.ignite.internal.processors.hadoop.*;
-import org.apache.ignite.internal.util.*;
-import org.apache.ignite.internal.util.io.*;
-import org.apache.ignite.internal.util.offheap.unsafe.*;
-import org.apache.ignite.internal.util.typedef.*;
-
-import java.io.*;
-import java.util.*;
-import java.util.concurrent.*;
-
-import static 
org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory.*;
-
-/**
- *
- */
-public class GridHadoopConcurrentHashMultimapSelftest extends 
GridHadoopAbstractMapTest {
-    /** */
-    public void testMapSimple() throws Exception {
-        GridUnsafeMemory mem = new GridUnsafeMemory(0);
-
-//        mem.listen(new GridOffHeapEventListener() {
-//            @Override public void onEvent(GridOffHeapEvent evt) {
-//                if (evt == GridOffHeapEvent.ALLOCATE)
-//                    U.dumpStack();
-//            }
-//        });
-
-        Random rnd = new Random();
-
-        int mapSize = 16 << rnd.nextInt(3);
-
-        GridHadoopJobInfo job = new JobInfo();
-
-        GridHadoopTaskContext taskCtx = new TaskContext();
-
-        GridHadoopConcurrentHashMultimap m = new 
GridHadoopConcurrentHashMultimap(job, mem, mapSize);
-
-        GridHadoopConcurrentHashMultimap.Adder a = m.startAdding(taskCtx);
-
-        Multimap<Integer, Integer> mm = ArrayListMultimap.create();
-        Multimap<Integer, Integer> vis = ArrayListMultimap.create();
-
-        for (int i = 0, vals = 4 * mapSize + rnd.nextInt(25); i < vals; i++) {
-            int key = rnd.nextInt(mapSize);
-            int val = rnd.nextInt();
-
-            a.write(new IntWritable(key), new IntWritable(val));
-            mm.put(key, val);
-
-            X.println("k: " + key + " v: " + val);
-
-            a.close();
-
-            check(m, mm, vis, taskCtx);
-
-            a = m.startAdding(taskCtx);
-        }
-
-//        a.add(new IntWritable(10), new IntWritable(2));
-//        mm.put(10, 2);
-//        check(m, mm);
-
-        a.close();
-
-        X.println("Alloc: " + mem.allocatedSize());
-
-        m.close();
-
-        assertEquals(0, mem.allocatedSize());
-    }
-
-    private void check(GridHadoopConcurrentHashMultimap m, Multimap<Integer, 
Integer> mm,
-        final Multimap<Integer, Integer> vis, GridHadoopTaskContext taskCtx) 
throws Exception {
-        final GridHadoopTaskInput in = m.input(taskCtx);
-
-        Map<Integer, Collection<Integer>> mmm = mm.asMap();
-
-        int keys = 0;
-
-        while (in.next()) {
-            keys++;
-
-            IntWritable k = (IntWritable)in.key();
-
-            assertNotNull(k);
-
-            Deque<Integer> vs = new LinkedList<>();
-
-            Iterator<?> it = in.values();
-
-            while (it.hasNext())
-                vs.addFirst(((IntWritable) it.next()).get());
-
-            Collection<Integer> exp = mmm.get(k.get());
-
-            assertEquals(exp, vs);
-        }
-
-        assertEquals(mmm.size(), keys);
-
-        assertEquals(m.keys(), keys);
-
-        X.println("keys: " + keys + " cap: " + m.capacity());
-
-        // Check visitor.
-
-        final byte[] buf = new byte[4];
-
-        final GridDataInput dataInput = new GridUnsafeDataInput();
-
-        m.visit(false, new GridHadoopConcurrentHashMultimap.Visitor() {
-            /** */
-            IntWritable key = new IntWritable();
-
-            /** */
-            IntWritable val = new IntWritable();
-
-            @Override public void onKey(long keyPtr, int keySize) {
-                read(keyPtr, keySize, key);
-            }
-
-            @Override public void onValue(long valPtr, int valSize) {
-                read(valPtr, valSize, val);
-
-                vis.put(key.get(), val.get());
-            }
-
-            private void read(long ptr, int size, Writable w) {
-                assert size == 4 : size;
-
-                UNSAFE.copyMemory(null, ptr, buf, BYTE_ARR_OFF, size);
-
-                dataInput.bytes(buf, size);
-
-                try {
-                    w.readFields(dataInput);
-                }
-                catch (IOException e) {
-                    throw new RuntimeException(e);
-                }
-            }
-        });
-
-//        X.println("vis: " + vis);
-
-        assertEquals(mm, vis);
-
-        in.close();
-    }
-
-    /**
-     * @throws Exception if failed.
-     */
-    public void testMultiThreaded() throws Exception {
-        GridUnsafeMemory mem = new GridUnsafeMemory(0);
-
-        X.println("___ Started");
-
-        Random rnd = new GridRandom();
-
-        for (int i = 0; i < 20; i++) {
-            GridHadoopJobInfo job = new JobInfo();
-
-            final GridHadoopTaskContext taskCtx = new TaskContext();
-
-            final GridHadoopConcurrentHashMultimap m = new 
GridHadoopConcurrentHashMultimap(job, mem, 16);
-
-            final ConcurrentMap<Integer, Collection<Integer>> mm = new 
ConcurrentHashMap<>();
-
-            X.println("___ MT");
-
-            multithreaded(new Callable<Object>() {
-                @Override public Object call() throws Exception {
-                    X.println("___ TH in");
-
-                    Random rnd = new GridRandom();
-
-                    IntWritable key = new IntWritable();
-                    IntWritable val = new IntWritable();
-
-                    GridHadoopMultimap.Adder a = m.startAdding(taskCtx);
-
-                    for (int i = 0; i < 50000; i++) {
-                        int k = rnd.nextInt(32000);
-                        int v = rnd.nextInt();
-
-                        key.set(k);
-                        val.set(v);
-
-                        a.write(key, val);
-
-                        Collection<Integer> list = mm.get(k);
-
-                        if (list == null) {
-                            list = new ConcurrentLinkedQueue<>();
-
-                            Collection<Integer> old = mm.putIfAbsent(k, list);
-
-                            if (old != null)
-                                list = old;
-                        }
-
-                        list.add(v);
-                    }
-
-                    a.close();
-
-                    X.println("___ TH out");
-
-                    return null;
-                }
-            }, 3 + rnd.nextInt(27));
-
-            X.println("___ Check: " + m.capacity());
-
-            assertEquals(mm.size(), m.keys());
-
-            assertTrue(m.capacity() > 32000);
-
-            GridHadoopTaskInput in = m.input(taskCtx);
-
-            while (in.next()) {
-                IntWritable key = (IntWritable) in.key();
-
-                Iterator<?> valsIter = in.values();
-
-                Collection<Integer> vals = mm.remove(key.get());
-
-                assertNotNull(vals);
-
-                while (valsIter.hasNext()) {
-                    IntWritable val = (IntWritable) valsIter.next();
-
-                    assertTrue(vals.remove(val.get()));
-                }
-
-                assertTrue(vals.isEmpty());
-            }
-
-            in.close();
-            m.close();
-
-            assertEquals(0, mem.allocatedSize());
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopHashMapSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopHashMapSelfTest.java
 
b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopHashMapSelfTest.java
deleted file mode 100644
index 92177ad..0000000
--- 
a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopHashMapSelfTest.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.shuffle.collections;
-
-import com.google.common.collect.*;
-import org.apache.hadoop.io.*;
-import org.apache.ignite.internal.processors.hadoop.*;
-import org.apache.ignite.internal.util.*;
-import org.apache.ignite.internal.util.offheap.unsafe.*;
-import org.apache.ignite.internal.util.typedef.*;
-
-import java.util.*;
-import java.util.concurrent.*;
-
-/**
- *
- */
-public class GridHadoopHashMapSelfTest extends GridHadoopAbstractMapTest {
-
-    public void _testAllocation() throws Exception {
-        final GridUnsafeMemory mem = new GridUnsafeMemory(0);
-
-        long size = 3L * 1024 * 1024 * 1024;
-
-        final long chunk = 16;// * 1024;
-
-        final int page = 4 * 1024;
-
-        final int writes = chunk < page ? 1 : (int)(chunk / page);
-
-        final long cnt = size / chunk;
-
-        assert cnt < Integer.MAX_VALUE;
-
-        final int threads = 4;
-
-        long start = System.currentTimeMillis();
-
-        multithreaded(new Callable<Object>() {
-            @Override public Object call() throws Exception {
-                int cnt0 = (int)(cnt / threads);
-
-                for (int i = 0; i < cnt0; i++) {
-                    long ptr = mem.allocate(chunk);
-
-                    for (int j = 0; j < writes; j++)
-                        mem.writeInt(ptr + j * page, 100500);
-                }
-
-                return null;
-            }
-        }, threads);
-
-        X.println("End: " + (System.currentTimeMillis() - start) + " mem: " + 
mem.allocatedSize() + " cnt: " + cnt);
-
-        Thread.sleep(30000);
-    }
-
-
-    /** */
-    public void testMapSimple() throws Exception {
-        GridUnsafeMemory mem = new GridUnsafeMemory(0);
-
-//        mem.listen(new GridOffHeapEventListener() {
-//            @Override public void onEvent(GridOffHeapEvent evt) {
-//                if (evt == GridOffHeapEvent.ALLOCATE)
-//                    U.dumpStack();
-//            }
-//        });
-
-        Random rnd = new Random();
-
-        int mapSize = 16 << rnd.nextInt(3);
-
-        GridHadoopTaskContext taskCtx = new TaskContext();
-
-        final GridHadoopHashMultimap m = new GridHadoopHashMultimap(new 
JobInfo(), mem, mapSize);
-
-        GridHadoopMultimap.Adder a = m.startAdding(taskCtx);
-
-        Multimap<Integer, Integer> mm = ArrayListMultimap.create();
-
-        for (int i = 0, vals = 4 * mapSize + rnd.nextInt(25); i < vals; i++) {
-            int key = rnd.nextInt(mapSize);
-            int val = rnd.nextInt();
-
-            a.write(new IntWritable(key), new IntWritable(val));
-            mm.put(key, val);
-
-            X.println("k: " + key + " v: " + val);
-
-            a.close();
-
-            check(m, mm, taskCtx);
-
-            a = m.startAdding(taskCtx);
-        }
-
-//        a.add(new IntWritable(10), new IntWritable(2));
-//        mm.put(10, 2);
-//        check(m, mm);
-
-        a.close();
-
-        X.println("Alloc: " + mem.allocatedSize());
-
-        m.close();
-
-        assertEquals(0, mem.allocatedSize());
-    }
-
-    private void check(GridHadoopHashMultimap m, Multimap<Integer, Integer> 
mm, GridHadoopTaskContext taskCtx) throws Exception {
-        final GridHadoopTaskInput in = m.input(taskCtx);
-
-        Map<Integer, Collection<Integer>> mmm = mm.asMap();
-
-        int keys = 0;
-
-        while (in.next()) {
-            keys++;
-
-            IntWritable k = (IntWritable)in.key();
-
-            assertNotNull(k);
-
-            ArrayList<Integer> vs = new ArrayList<>();
-
-            Iterator<?> it = in.values();
-
-            while (it.hasNext())
-                vs.add(((IntWritable) it.next()).get());
-
-            Collection<Integer> exp = mmm.get(k.get());
-
-            assertEquals(sorted(exp), sorted(vs));
-        }
-
-        X.println("keys: " + keys + " cap: " + m.capacity());
-
-        assertEquals(mmm.size(), keys);
-
-        assertEquals(m.keys(), keys);
-
-        in.close();
-    }
-
-    private GridLongList sorted(Collection<Integer> col) {
-        GridLongList lst = new GridLongList(col.size());
-
-        for (Integer i : col)
-            lst.add(i);
-
-        return lst.sort();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopSkipListSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopSkipListSelfTest.java
 
b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopSkipListSelfTest.java
deleted file mode 100644
index 6ba00ad..0000000
--- 
a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopSkipListSelfTest.java
+++ /dev/null
@@ -1,303 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.shuffle.collections;
-
-import com.google.common.collect.*;
-import org.apache.hadoop.io.*;
-import org.apache.ignite.internal.processors.hadoop.*;
-import org.apache.ignite.internal.util.*;
-import org.apache.ignite.internal.util.io.*;
-import org.apache.ignite.internal.util.offheap.unsafe.*;
-import org.apache.ignite.internal.util.typedef.*;
-
-import java.io.*;
-import java.util.*;
-import java.util.concurrent.*;
-
-import static java.lang.Math.*;
-import static 
org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory.*;
-
-/**
- * Skip list tests.
- */
-public class GridHadoopSkipListSelfTest extends GridHadoopAbstractMapTest {
-    /**
-     *
-     */
-    public void testLevel() {
-        Random rnd = new GridRandom();
-
-        int[] levelsCnts = new int[32];
-
-        int all = 10000;
-
-        for (int i = 0; i < all; i++) {
-            int level = GridHadoopSkipList.randomLevel(rnd);
-
-            levelsCnts[level]++;
-        }
-
-        X.println("Distribution: " + Arrays.toString(levelsCnts));
-
-        for (int level = 0; level < levelsCnts.length; level++) {
-            int exp = (level + 1) == levelsCnts.length ? 0 : all >>> (level + 
1);
-
-            double precission = 0.72 / Math.max(32 >>> level, 1);
-
-            int sigma = max((int)ceil(precission * exp), 5);
-
-            X.println("Level: " + level + " exp: " + exp + " act: " + 
levelsCnts[level] + " precission: " + precission +
-                " sigma: " + sigma);
-
-            assertTrue(abs(exp - levelsCnts[level]) <= sigma);
-        }
-    }
-
-    public void testMapSimple() throws Exception {
-        GridUnsafeMemory mem = new GridUnsafeMemory(0);
-
-//        mem.listen(new GridOffHeapEventListener() {
-//            @Override public void onEvent(GridOffHeapEvent evt) {
-//                if (evt == GridOffHeapEvent.ALLOCATE)
-//                    U.dumpStack();
-//            }
-//        });
-
-        Random rnd = new Random();
-
-        int mapSize = 16 << rnd.nextInt(6);
-
-        GridHadoopJobInfo job = new JobInfo();
-
-        GridHadoopTaskContext taskCtx = new TaskContext();
-
-        GridHadoopMultimap m = new GridHadoopSkipList(job, mem);
-
-        GridHadoopConcurrentHashMultimap.Adder a = m.startAdding(taskCtx);
-
-        Multimap<Integer, Integer> mm = ArrayListMultimap.create();
-        Multimap<Integer, Integer> vis = ArrayListMultimap.create();
-
-        for (int i = 0, vals = 4 * mapSize + rnd.nextInt(25); i < vals; i++) {
-            int key = rnd.nextInt(mapSize);
-            int val = rnd.nextInt();
-
-            a.write(new IntWritable(key), new IntWritable(val));
-            mm.put(key, val);
-
-            X.println("k: " + key + " v: " + val);
-
-            a.close();
-
-            check(m, mm, vis, taskCtx);
-
-            a = m.startAdding(taskCtx);
-        }
-
-//        a.add(new IntWritable(10), new IntWritable(2));
-//        mm.put(10, 2);
-//        check(m, mm);
-
-        a.close();
-
-        X.println("Alloc: " + mem.allocatedSize());
-
-        m.close();
-
-        assertEquals(0, mem.allocatedSize());
-    }
-
-    private void check(GridHadoopMultimap m, Multimap<Integer, Integer> mm, 
final Multimap<Integer, Integer> vis, GridHadoopTaskContext taskCtx)
-        throws Exception {
-        final GridHadoopTaskInput in = m.input(taskCtx);
-
-        Map<Integer, Collection<Integer>> mmm = mm.asMap();
-
-        int keys = 0;
-
-        int prevKey = Integer.MIN_VALUE;
-
-        while (in.next()) {
-            keys++;
-
-            IntWritable k = (IntWritable)in.key();
-
-            assertNotNull(k);
-
-            assertTrue(k.get() > prevKey);
-
-            prevKey = k.get();
-
-            Deque<Integer> vs = new LinkedList<>();
-
-            Iterator<?> it = in.values();
-
-            while (it.hasNext())
-                vs.addFirst(((IntWritable) it.next()).get());
-
-            Collection<Integer> exp = mmm.get(k.get());
-
-            assertEquals(exp, vs);
-        }
-
-        assertEquals(mmm.size(), keys);
-
-//!        assertEquals(m.keys(), keys);
-
-        // Check visitor.
-
-        final byte[] buf = new byte[4];
-
-        final GridDataInput dataInput = new GridUnsafeDataInput();
-
-        m.visit(false, new GridHadoopConcurrentHashMultimap.Visitor() {
-            /** */
-            IntWritable key = new IntWritable();
-
-            /** */
-            IntWritable val = new IntWritable();
-
-            @Override public void onKey(long keyPtr, int keySize) {
-                read(keyPtr, keySize, key);
-            }
-
-            @Override public void onValue(long valPtr, int valSize) {
-                read(valPtr, valSize, val);
-
-                vis.put(key.get(), val.get());
-            }
-
-            private void read(long ptr, int size, Writable w) {
-                assert size == 4 : size;
-
-                UNSAFE.copyMemory(null, ptr, buf, BYTE_ARR_OFF, size);
-
-                dataInput.bytes(buf, size);
-
-                try {
-                    w.readFields(dataInput);
-                }
-                catch (IOException e) {
-                    throw new RuntimeException(e);
-                }
-            }
-        });
-
-//        X.println("vis: " + vis);
-
-        assertEquals(mm, vis);
-
-        in.close();
-    }
-
-    /**
-     * @throws Exception if failed.
-     */
-    public void testMultiThreaded() throws Exception {
-        GridUnsafeMemory mem = new GridUnsafeMemory(0);
-
-        X.println("___ Started");
-
-        Random rnd = new GridRandom();
-
-        for (int i = 0; i < 20; i++) {
-            GridHadoopJobInfo job = new JobInfo();
-
-            final GridHadoopTaskContext taskCtx = new TaskContext();
-
-            final GridHadoopMultimap m = new GridHadoopSkipList(job, mem);
-
-            final ConcurrentMap<Integer, Collection<Integer>> mm = new 
ConcurrentHashMap<>();
-
-            X.println("___ MT");
-
-            multithreaded(new Callable<Object>() {
-                @Override public Object call() throws Exception {
-                    X.println("___ TH in");
-
-                    Random rnd = new GridRandom();
-
-                    IntWritable key = new IntWritable();
-                    IntWritable val = new IntWritable();
-
-                    GridHadoopMultimap.Adder a = m.startAdding(taskCtx);
-
-                    for (int i = 0; i < 50000; i++) {
-                        int k = rnd.nextInt(32000);
-                        int v = rnd.nextInt();
-
-                        key.set(k);
-                        val.set(v);
-
-                        a.write(key, val);
-
-                        Collection<Integer> list = mm.get(k);
-
-                        if (list == null) {
-                            list = new ConcurrentLinkedQueue<>();
-
-                            Collection<Integer> old = mm.putIfAbsent(k, list);
-
-                            if (old != null)
-                                list = old;
-                        }
-
-                        list.add(v);
-                    }
-
-                    a.close();
-
-                    X.println("___ TH out");
-
-                    return null;
-                }
-            }, 3 + rnd.nextInt(27));
-
-            GridHadoopTaskInput in = m.input(taskCtx);
-
-            int prevKey = Integer.MIN_VALUE;
-
-            while (in.next()) {
-                IntWritable key = (IntWritable)in.key();
-
-                assertTrue(key.get() > prevKey);
-
-                prevKey = key.get();
-
-                Iterator<?> valsIter = in.values();
-
-                Collection<Integer> vals = mm.remove(key.get());
-
-                assertNotNull(vals);
-
-                while (valsIter.hasNext()) {
-                    IntWritable val = (IntWritable) valsIter.next();
-
-                    assertTrue(vals.remove(val.get()));
-                }
-
-                assertTrue(vals.isEmpty());
-            }
-
-            in.close();
-            m.close();
-
-            assertEquals(0, mem.allocatedSize());
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java
 
b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java
new file mode 100644
index 0000000..b4ed5e1
--- /dev/null
+++ 
b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.shuffle.collections;
+
+import org.apache.commons.collections.comparators.*;
+import org.apache.hadoop.io.*;
+import org.apache.ignite.*;
+import org.apache.ignite.internal.processors.hadoop.*;
+import org.apache.ignite.internal.processors.hadoop.counter.*;
+import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
+import org.apache.ignite.internal.processors.hadoop.v2.*;
+import org.apache.ignite.testframework.junits.common.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+/**
+ * Abstract class for maps test.
+ */
+public abstract class HadoopAbstractMapTest extends GridCommonAbstractTest {
+    /**
+     * Test task context.
+     */
+    protected static class TaskContext extends HadoopTaskContext {
+        /**
+         */
+        protected TaskContext() {
+            super(null, null);
+        }
+
+        /** {@inheritDoc} */
+        @Override public <T extends HadoopCounter> T counter(String grp, 
String name, Class<T> cls) {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public HadoopCounters counters() {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public HadoopPartitioner partitioner() throws 
IgniteCheckedException {
+            assert false;
+
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public HadoopSerialization keySerialization() throws 
IgniteCheckedException {
+            return new HadoopWritableSerialization(IntWritable.class);
+        }
+
+        /** {@inheritDoc} */
+        @Override public HadoopSerialization valueSerialization() throws 
IgniteCheckedException {
+            return new HadoopWritableSerialization(IntWritable.class);
+        }
+
+        /** {@inheritDoc} */
+        @Override public Comparator<Object> sortComparator() {
+            return ComparableComparator.getInstance();
+        }
+
+        /** {@inheritDoc} */
+        @Override public Comparator<Object> groupComparator() {
+            return ComparableComparator.getInstance();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void run() throws IgniteCheckedException {
+            assert false;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void cancel() {
+            assert false;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void prepareTaskEnvironment() throws 
IgniteCheckedException {
+            assert false;
+        }
+
+        @Override public void cleanupTaskEnvironment() throws 
IgniteCheckedException {
+            assert false;
+        }
+    }
+
+    /**
+     * Test job info.
+     */
+    protected static class JobInfo implements HadoopJobInfo {
+        /** {@inheritDoc} */
+        @Nullable @Override public String property(String name) {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean hasCombiner() {
+            assert false;
+
+            return false;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean hasReducer() {
+            assert false;
+
+            return false;
+        }
+
+        /** {@inheritDoc} */
+        @Override public HadoopJob createJob(HadoopJobId jobId, IgniteLogger 
log) throws IgniteCheckedException {
+            assert false;
+
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int reducers() {
+            assert false;
+
+            return 0;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String jobName() {
+            assert false;
+
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String user() {
+            assert false;
+
+            return null;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopConcurrentHashMultimapSelftest.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopConcurrentHashMultimapSelftest.java
 
b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopConcurrentHashMultimapSelftest.java
new file mode 100644
index 0000000..ae6bafa
--- /dev/null
+++ 
b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopConcurrentHashMultimapSelftest.java
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.shuffle.collections;
+
+import com.google.common.collect.*;
+import org.apache.hadoop.io.*;
+import org.apache.ignite.internal.processors.hadoop.*;
+import org.apache.ignite.internal.util.*;
+import org.apache.ignite.internal.util.io.*;
+import org.apache.ignite.internal.util.offheap.unsafe.*;
+import org.apache.ignite.internal.util.typedef.*;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.*;
+
+import static 
org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory.*;
+
+/**
+ *
+ */
+public class HadoopConcurrentHashMultimapSelftest extends 
HadoopAbstractMapTest {
+    /** */
+    public void testMapSimple() throws Exception {
+        GridUnsafeMemory mem = new GridUnsafeMemory(0);
+
+//        mem.listen(new GridOffHeapEventListener() {
+//            @Override public void onEvent(GridOffHeapEvent evt) {
+//                if (evt == GridOffHeapEvent.ALLOCATE)
+//                    U.dumpStack();
+//            }
+//        });
+
+        Random rnd = new Random();
+
+        int mapSize = 16 << rnd.nextInt(3);
+
+        HadoopJobInfo job = new JobInfo();
+
+        HadoopTaskContext taskCtx = new TaskContext();
+
+        HadoopConcurrentHashMultimap m = new HadoopConcurrentHashMultimap(job, 
mem, mapSize);
+
+        HadoopConcurrentHashMultimap.Adder a = m.startAdding(taskCtx);
+
+        Multimap<Integer, Integer> mm = ArrayListMultimap.create();
+        Multimap<Integer, Integer> vis = ArrayListMultimap.create();
+
+        for (int i = 0, vals = 4 * mapSize + rnd.nextInt(25); i < vals; i++) {
+            int key = rnd.nextInt(mapSize);
+            int val = rnd.nextInt();
+
+            a.write(new IntWritable(key), new IntWritable(val));
+            mm.put(key, val);
+
+            X.println("k: " + key + " v: " + val);
+
+            a.close();
+
+            check(m, mm, vis, taskCtx);
+
+            a = m.startAdding(taskCtx);
+        }
+
+//        a.add(new IntWritable(10), new IntWritable(2));
+//        mm.put(10, 2);
+//        check(m, mm);
+
+        a.close();
+
+        X.println("Alloc: " + mem.allocatedSize());
+
+        m.close();
+
+        assertEquals(0, mem.allocatedSize());
+    }
+
+    private void check(HadoopConcurrentHashMultimap m, Multimap<Integer, 
Integer> mm,
+        final Multimap<Integer, Integer> vis, HadoopTaskContext taskCtx) 
throws Exception {
+        final HadoopTaskInput in = m.input(taskCtx);
+
+        Map<Integer, Collection<Integer>> mmm = mm.asMap();
+
+        int keys = 0;
+
+        while (in.next()) {
+            keys++;
+
+            IntWritable k = (IntWritable)in.key();
+
+            assertNotNull(k);
+
+            Deque<Integer> vs = new LinkedList<>();
+
+            Iterator<?> it = in.values();
+
+            while (it.hasNext())
+                vs.addFirst(((IntWritable) it.next()).get());
+
+            Collection<Integer> exp = mmm.get(k.get());
+
+            assertEquals(exp, vs);
+        }
+
+        assertEquals(mmm.size(), keys);
+
+        assertEquals(m.keys(), keys);
+
+        X.println("keys: " + keys + " cap: " + m.capacity());
+
+        // Check visitor.
+
+        final byte[] buf = new byte[4];
+
+        final GridDataInput dataInput = new GridUnsafeDataInput();
+
+        m.visit(false, new HadoopConcurrentHashMultimap.Visitor() {
+            /** */
+            IntWritable key = new IntWritable();
+
+            /** */
+            IntWritable val = new IntWritable();
+
+            @Override public void onKey(long keyPtr, int keySize) {
+                read(keyPtr, keySize, key);
+            }
+
+            @Override public void onValue(long valPtr, int valSize) {
+                read(valPtr, valSize, val);
+
+                vis.put(key.get(), val.get());
+            }
+
+            private void read(long ptr, int size, Writable w) {
+                assert size == 4 : size;
+
+                UNSAFE.copyMemory(null, ptr, buf, BYTE_ARR_OFF, size);
+
+                dataInput.bytes(buf, size);
+
+                try {
+                    w.readFields(dataInput);
+                }
+                catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        });
+
+//        X.println("vis: " + vis);
+
+        assertEquals(mm, vis);
+
+        in.close();
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    public void testMultiThreaded() throws Exception {
+        GridUnsafeMemory mem = new GridUnsafeMemory(0);
+
+        X.println("___ Started");
+
+        Random rnd = new GridRandom();
+
+        for (int i = 0; i < 20; i++) {
+            HadoopJobInfo job = new JobInfo();
+
+            final HadoopTaskContext taskCtx = new TaskContext();
+
+            final HadoopConcurrentHashMultimap m = new 
HadoopConcurrentHashMultimap(job, mem, 16);
+
+            final ConcurrentMap<Integer, Collection<Integer>> mm = new 
ConcurrentHashMap<>();
+
+            X.println("___ MT");
+
+            multithreaded(new Callable<Object>() {
+                @Override public Object call() throws Exception {
+                    X.println("___ TH in");
+
+                    Random rnd = new GridRandom();
+
+                    IntWritable key = new IntWritable();
+                    IntWritable val = new IntWritable();
+
+                    HadoopMultimap.Adder a = m.startAdding(taskCtx);
+
+                    for (int i = 0; i < 50000; i++) {
+                        int k = rnd.nextInt(32000);
+                        int v = rnd.nextInt();
+
+                        key.set(k);
+                        val.set(v);
+
+                        a.write(key, val);
+
+                        Collection<Integer> list = mm.get(k);
+
+                        if (list == null) {
+                            list = new ConcurrentLinkedQueue<>();
+
+                            Collection<Integer> old = mm.putIfAbsent(k, list);
+
+                            if (old != null)
+                                list = old;
+                        }
+
+                        list.add(v);
+                    }
+
+                    a.close();
+
+                    X.println("___ TH out");
+
+                    return null;
+                }
+            }, 3 + rnd.nextInt(27));
+
+            X.println("___ Check: " + m.capacity());
+
+            assertEquals(mm.size(), m.keys());
+
+            assertTrue(m.capacity() > 32000);
+
+            HadoopTaskInput in = m.input(taskCtx);
+
+            while (in.next()) {
+                IntWritable key = (IntWritable) in.key();
+
+                Iterator<?> valsIter = in.values();
+
+                Collection<Integer> vals = mm.remove(key.get());
+
+                assertNotNull(vals);
+
+                while (valsIter.hasNext()) {
+                    IntWritable val = (IntWritable) valsIter.next();
+
+                    assertTrue(vals.remove(val.get()));
+                }
+
+                assertTrue(vals.isEmpty());
+            }
+
+            in.close();
+            m.close();
+
+            assertEquals(0, mem.allocatedSize());
+        }
+    }
+}

Reply via email to