http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/36b439d9/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopAbstractSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopAbstractSelfTest.java index ff7b487..30f13ac 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopAbstractSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopAbstractSelfTest.java @@ -21,7 +21,6 @@ import org.apache.hadoop.conf.*; import org.apache.ignite.cache.*; import org.apache.ignite.configuration.*; import org.apache.ignite.fs.*; -import org.apache.ignite.hadoop.*; import org.apache.ignite.internal.processors.hadoop.fs.*; import org.apache.ignite.spi.communication.tcp.*; import org.apache.ignite.testframework.junits.common.*;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/36b439d9/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopAbstractWordCountTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopAbstractWordCountTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopAbstractWordCountTest.java index 24f5be2..feabea2 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopAbstractWordCountTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopAbstractWordCountTest.java @@ -19,7 +19,7 @@ package org.apache.ignite.internal.processors.hadoop; import com.google.common.base.*; import org.apache.hadoop.conf.*; -import org.apache.hadoop.fs.*; +import org.apache.hadoop.fs.FileSystem; import org.apache.ignite.fs.*; import org.apache.ignite.internal.processors.fs.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/36b439d9/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopClassLoaderTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopClassLoaderTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopClassLoaderTest.java index 1753472..f9ed542 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopClassLoaderTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopClassLoaderTest.java @@ -17,8 +17,8 @@ package org.apache.ignite.internal.processors.hadoop; -import junit.framework.TestCase; -import org.apache.hadoop.mapreduce.Job; +import junit.framework.*; +import org.apache.hadoop.mapreduce.*; /** * http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/36b439d9/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopCommandLineTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopCommandLineTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopCommandLineTest.java index a83dc08..1d1d62c 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopCommandLineTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopCommandLineTest.java @@ -17,10 +17,9 @@ package org.apache.ignite.internal.processors.hadoop; -import com.google.common.base.Joiner; +import com.google.common.base.*; import org.apache.ignite.*; import org.apache.ignite.fs.*; -import org.apache.ignite.hadoop.*; import org.apache.ignite.internal.processors.fs.*; import org.apache.ignite.internal.processors.hadoop.counter.*; import org.apache.ignite.internal.processors.hadoop.jobtracker.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/36b439d9/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopDefaultMapReducePlannerSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopDefaultMapReducePlannerSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopDefaultMapReducePlannerSelfTest.java index 7b5f76b..4ffbfd9 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopDefaultMapReducePlannerSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopDefaultMapReducePlannerSelfTest.java @@ -25,11 +25,10 @@ import org.apache.ignite.fs.mapreduce.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.fs.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.hadoop.*; import org.apache.ignite.internal.processors.hadoop.planner.*; import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.lang.*; import org.apache.ignite.testframework.*; import org.jetbrains.annotations.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/36b439d9/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopFileSystemsTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopFileSystemsTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopFileSystemsTest.java index e7c0efe..ccc30a7 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopFileSystemsTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopFileSystemsTest.java @@ -18,15 +18,16 @@ package org.apache.ignite.internal.processors.hadoop; import org.apache.hadoop.conf.*; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.*; import org.apache.hadoop.mapreduce.*; import org.apache.ignite.internal.processors.hadoop.fs.*; -import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.*; import java.io.*; import java.net.*; import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.*; /** * Test file systems for the working directory multi-threading support. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/36b439d9/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopGroupingTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopGroupingTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopGroupingTest.java index de38b7c..8d20d99 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopGroupingTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopGroupingTest.java @@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.hadoop; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.*; -import org.apache.ignite.hadoop.*; import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/36b439d9/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopJobTrackerSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopJobTrackerSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopJobTrackerSelfTest.java index b735263..f1f316f 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopJobTrackerSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopJobTrackerSelfTest.java @@ -19,10 +19,9 @@ package org.apache.ignite.internal.processors.hadoop; import org.apache.hadoop.fs.*; import org.apache.hadoop.mapreduce.*; -import org.apache.hadoop.mapreduce.lib.input.FileSplit; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.input.*; +import org.apache.hadoop.mapreduce.lib.output.*; import org.apache.ignite.internal.*; -import org.apache.ignite.hadoop.*; import org.apache.ignite.internal.util.typedef.internal.*; import java.io.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/36b439d9/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopMapReduceEmbeddedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopMapReduceEmbeddedSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopMapReduceEmbeddedSelfTest.java index 4d8d610..61e18c3 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopMapReduceEmbeddedSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopMapReduceEmbeddedSelfTest.java @@ -27,7 +27,6 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.ignite.fs.*; import org.apache.ignite.internal.*; -import org.apache.ignite.hadoop.*; import org.apache.ignite.internal.processors.hadoop.examples.*; import java.util.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/36b439d9/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopMapReduceTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopMapReduceTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopMapReduceTest.java index 5a3e536..f5bcffe 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopMapReduceTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopMapReduceTest.java @@ -26,7 +26,6 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.ignite.*; import org.apache.ignite.fs.*; import org.apache.ignite.internal.*; -import org.apache.ignite.hadoop.*; import org.apache.ignite.internal.processors.hadoop.counter.*; import org.apache.ignite.internal.processors.hadoop.examples.*; import org.apache.ignite.internal.util.lang.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/36b439d9/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopPopularWordsTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopPopularWordsTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopPopularWordsTest.java new file mode 100644 index 0000000..5913d84 --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopPopularWordsTest.java @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import com.google.common.collect.*; +import org.apache.hadoop.conf.*; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.*; +import org.apache.hadoop.io.*; +import org.apache.hadoop.mapreduce.*; +import org.apache.hadoop.mapreduce.lib.input.*; +import org.apache.hadoop.mapreduce.lib.output.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.io.*; +import java.util.*; +import java.util.Map.*; + +import static com.google.common.collect.Maps.*; +import static com.google.common.collect.MinMaxPriorityQueue.*; +import static java.util.Collections.*; + +/** + * Hadoop-based 10 popular words example: all files in a given directory are tokenized and for each word longer than + * 3 characters the number of occurrences ins calculated. Finally, 10 words with the highest occurrence count are + * output. + * + * NOTE: in order to run this example on Windows please ensure that cygwin is installed and available in the system + * path. + */ +public class GridHadoopPopularWordsTest { + /** GridGain home. */ + private static final String IGNITE_HOME = U.getGridGainHome(); + + /** The path to the input directory. ALl files in that directory will be processed. */ + private static final Path BOOKS_LOCAL_DIR = + new Path("file:" + IGNITE_HOME, "modules/tests/java/org/gridgain/grid/hadoop/books"); + + /** The path to the output directory. THe result file will be written to this location. */ + private static final Path RESULT_LOCAL_DIR = + new Path("file:" + IGNITE_HOME, "modules/tests/java/org/gridgain/grid/hadoop/output"); + + /** Popular books source dir in DFS. */ + private static final Path BOOKS_DFS_DIR = new Path("tmp/word-count-example/in"); + + /** Popular books source dir in DFS. */ + private static final Path RESULT_DFS_DIR = new Path("tmp/word-count-example/out"); + + /** Path to the distributed file system configuration. */ + private static final String DFS_CFG = "examples/config/filesystem/core-site.xml"; + + /** Top N words to select **/ + private static final int POPULAR_WORDS_CNT = 10; + + /** + * For each token in the input string the mapper emits a {word, 1} pair. + */ + private static class TokenizingMapper extends Mapper<LongWritable, Text, Text, IntWritable> { + /** Constant value. */ + private static final IntWritable ONE = new IntWritable(1); + + /** The word converted into the Text. */ + private Text word = new Text(); + + /** + * Emits a entry where the key is the word and the value is always 1. + * + * @param key the current position in the input file (not used here) + * @param val the text string + * @param ctx mapper context + * @throws IOException + * @throws InterruptedException + */ + @Override protected void map(LongWritable key, Text val, Context ctx) + throws IOException, InterruptedException { + // Get the mapped object. + final String line = val.toString(); + + // Splits the given string to words. + final String[] words = line.split("[^a-zA-Z0-9]"); + + for (final String w : words) { + // Only emit counts for longer words. + if (w.length() <= 3) + continue; + + word.set(w); + + // Write the word into the context with the initial count equals 1. + ctx.write(word, ONE); + } + } + } + + /** + * The reducer uses a priority queue to rank the words based on its number of occurrences. + */ + private static class TopNWordsReducer extends Reducer<Text, IntWritable, Text, IntWritable> { + private MinMaxPriorityQueue<Entry<Integer, String>> q; + + TopNWordsReducer() { + q = orderedBy(reverseOrder(new Comparator<Entry<Integer, String>>() { + @Override public int compare(Entry<Integer, String> o1, Entry<Integer, String> o2) { + return o1.getKey().compareTo(o2.getKey()); + } + })).expectedSize(POPULAR_WORDS_CNT).maximumSize(POPULAR_WORDS_CNT).create(); + } + + /** + * This method doesn't emit anything, but just keeps track of the top N words. + * + * @param key The word. + * @param vals The words counts. + * @param ctx Reducer context. + * @throws IOException If failed. + * @throws InterruptedException If failed. + */ + @Override public void reduce(Text key, Iterable<IntWritable> vals, Context ctx) throws IOException, + InterruptedException { + int sum = 0; + + for (IntWritable val : vals) + sum += val.get(); + + q.add(immutableEntry(sum, key.toString())); + } + + /** + * This method is called after all the word entries have been processed. It writes the accumulated + * statistics to the job output file. + * + * @param ctx The job context. + * @throws IOException If failed. + * @throws InterruptedException If failed. + */ + @Override protected void cleanup(Context ctx) throws IOException, InterruptedException { + IntWritable i = new IntWritable(); + + Text txt = new Text(); + + // iterate in desc order + while (!q.isEmpty()) { + Entry<Integer, String> e = q.removeFirst(); + + i.set(e.getKey()); + + txt.set(e.getValue()); + + ctx.write(txt, i); + } + } + } + + /** + * Configures the Hadoop MapReduce job. + * + * @return Instance of the Hadoop MapRed job. + * @throws IOException If failed. + */ + private Job createConfigBasedHadoopJob() throws IOException { + Job jobCfg = new Job(); + + Configuration cfg = jobCfg.getConfiguration(); + + // Use explicit configuration of distributed file system, if provided. + if (DFS_CFG != null) + cfg.addResource(U.resolveGridGainUrl(DFS_CFG)); + + jobCfg.setJobName("HadoopPopularWordExample"); + jobCfg.setJarByClass(GridHadoopPopularWordsTest.class); + jobCfg.setInputFormatClass(TextInputFormat.class); + jobCfg.setOutputKeyClass(Text.class); + jobCfg.setOutputValueClass(IntWritable.class); + jobCfg.setMapperClass(TokenizingMapper.class); + jobCfg.setReducerClass(TopNWordsReducer.class); + + FileInputFormat.setInputPaths(jobCfg, BOOKS_DFS_DIR); + FileOutputFormat.setOutputPath(jobCfg, RESULT_DFS_DIR); + + // Local job tracker allows the only task per wave, but text input format + // replaces it with the calculated value based on input split size option. + if ("local".equals(cfg.get("mapred.job.tracker", "local"))) { + // Split job into tasks using 32MB split size. + FileInputFormat.setMinInputSplitSize(jobCfg, 32 * 1024 * 1024); + FileInputFormat.setMaxInputSplitSize(jobCfg, Long.MAX_VALUE); + } + + return jobCfg; + } + + /** + * Runs the Hadoop job. + * + * @return {@code True} if succeeded, {@code false} otherwise. + * @throws Exception If failed. + */ + private boolean runWordCountConfigBasedHadoopJob() throws Exception { + Job job = createConfigBasedHadoopJob(); + + // Distributed file system this job will work with. + FileSystem fs = FileSystem.get(job.getConfiguration()); + + X.println(">>> Using distributed file system: " + fs.getHomeDirectory()); + + // Prepare input and output job directories. + prepareDirectories(fs); + + long time = System.currentTimeMillis(); + + // Run job. + boolean res = job.waitForCompletion(true); + + X.println(">>> Job execution time: " + (System.currentTimeMillis() - time) / 1000 + " sec."); + + // Move job results into local file system, so you can view calculated results. + publishResults(fs); + + return res; + } + + /** + * Prepare job's data: cleanup result directories that might have left over + * after previous runs, copy input files from the local file system into DFS. + * + * @param fs Distributed file system to use in job. + * @throws IOException If failed. + */ + private void prepareDirectories(FileSystem fs) throws IOException { + X.println(">>> Cleaning up DFS result directory: " + RESULT_DFS_DIR); + + fs.delete(RESULT_DFS_DIR, true); + + X.println(">>> Cleaning up DFS input directory: " + BOOKS_DFS_DIR); + + fs.delete(BOOKS_DFS_DIR, true); + + X.println(">>> Copy local files into DFS input directory: " + BOOKS_DFS_DIR); + + fs.copyFromLocalFile(BOOKS_LOCAL_DIR, BOOKS_DFS_DIR); + } + + /** + * Publish job execution results into local file system, so you can view them. + * + * @param fs Distributed file sytem used in job. + * @throws IOException If failed. + */ + private void publishResults(FileSystem fs) throws IOException { + X.println(">>> Cleaning up DFS input directory: " + BOOKS_DFS_DIR); + + fs.delete(BOOKS_DFS_DIR, true); + + X.println(">>> Cleaning up LOCAL result directory: " + RESULT_LOCAL_DIR); + + fs.delete(RESULT_LOCAL_DIR, true); + + X.println(">>> Moving job results into LOCAL result directory: " + RESULT_LOCAL_DIR); + + fs.copyToLocalFile(true, RESULT_DFS_DIR, RESULT_LOCAL_DIR); + } + + /** + * Executes a modified version of the Hadoop word count example. Here, in addition to counting the number of + * occurrences of the word in the source files, the N most popular words are selected. + * + * @param args None. + */ + public static void main(String[] args) { + try { + new GridHadoopPopularWordsTest().runWordCountConfigBasedHadoopJob(); + } + catch (Exception e) { + X.println(">>> Failed to run word count example: " + e.getMessage()); + } + + System.exit(0); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/36b439d9/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopSerializationWrapperSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopSerializationWrapperSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopSerializationWrapperSelfTest.java index 11874f5..79b9965 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopSerializationWrapperSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopSerializationWrapperSelfTest.java @@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.hadoop; import org.apache.hadoop.io.*; import org.apache.hadoop.io.serializer.*; -import org.apache.ignite.hadoop.*; import org.apache.ignite.internal.processors.hadoop.v2.*; import org.apache.ignite.testframework.junits.common.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/36b439d9/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopSharedMap.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopSharedMap.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopSharedMap.java index 51c5d11..689fb58 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopSharedMap.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopSharedMap.java @@ -17,9 +17,9 @@ package org.apache.ignite.internal.processors.hadoop; -import org.jdk8.backport.ConcurrentHashMap8; +import org.jdk8.backport.*; -import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.*; /** * For tests. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/36b439d9/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopSortingExternalTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopSortingExternalTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopSortingExternalTest.java index 2ca0b53..23884ef 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopSortingExternalTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopSortingExternalTest.java @@ -17,8 +17,6 @@ package org.apache.ignite.internal.processors.hadoop; -import org.apache.ignite.hadoop.*; - /** * External test for sorting. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/36b439d9/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopSortingTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopSortingTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopSortingTest.java index 9a79546..515d946 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopSortingTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopSortingTest.java @@ -23,7 +23,6 @@ import org.apache.hadoop.io.serializer.*; import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.lib.input.*; import org.apache.hadoop.mapreduce.lib.output.*; -import org.apache.ignite.hadoop.*; import org.apache.ignite.internal.util.typedef.*; import java.io.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/36b439d9/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTaskExecutionSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTaskExecutionSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTaskExecutionSelfTest.java index b6e8f41..63b9e6e 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTaskExecutionSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTaskExecutionSelfTest.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.hadoop; import org.apache.hadoop.conf.*; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.*; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.*; @@ -25,9 +26,8 @@ import org.apache.hadoop.mapreduce.lib.input.*; import org.apache.hadoop.mapreduce.lib.output.*; import org.apache.ignite.*; import org.apache.ignite.fs.*; -import org.apache.ignite.internal.*; import org.apache.ignite.fs.hadoop.v1.*; -import org.apache.ignite.hadoop.*; +import org.apache.ignite.internal.*; import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/36b439d9/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTasksAllVersionsTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTasksAllVersionsTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTasksAllVersionsTest.java index 88ccbe3..c732c1f 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTasksAllVersionsTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTasksAllVersionsTest.java @@ -21,7 +21,6 @@ import com.google.common.base.*; import org.apache.hadoop.io.*; import org.apache.ignite.*; import org.apache.ignite.fs.*; -import org.apache.ignite.hadoop.*; import org.apache.ignite.internal.processors.hadoop.examples.*; import org.apache.ignite.internal.processors.hadoop.v2.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/36b439d9/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTasksV1Test.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTasksV1Test.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTasksV1Test.java index 54197ac..15ac125 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTasksV1Test.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTasksV1Test.java @@ -18,7 +18,6 @@ package org.apache.ignite.internal.processors.hadoop; import org.apache.hadoop.mapred.*; -import org.apache.ignite.hadoop.*; import org.apache.ignite.internal.processors.hadoop.examples.*; import org.apache.ignite.internal.processors.hadoop.v2.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/36b439d9/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTasksV2Test.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTasksV2Test.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTasksV2Test.java index c2fb861..e48eb01 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTasksV2Test.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTasksV2Test.java @@ -23,7 +23,6 @@ import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.lib.input.*; import org.apache.hadoop.mapreduce.lib.output.*; -import org.apache.ignite.hadoop.*; import org.apache.ignite.internal.processors.hadoop.examples.*; import org.apache.ignite.internal.processors.hadoop.v2.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/36b439d9/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTestRoundRobinMrPlanner.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTestRoundRobinMrPlanner.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTestRoundRobinMrPlanner.java index 9d89aaf..5baa8cd 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTestRoundRobinMrPlanner.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTestRoundRobinMrPlanner.java @@ -19,8 +19,7 @@ package org.apache.ignite.internal.processors.hadoop; import org.apache.ignite.*; import org.apache.ignite.cluster.*; -import org.apache.ignite.hadoop.*; -import org.apache.ignite.internal.processors.hadoop.planner.GridHadoopDefaultMapReducePlan; +import org.apache.ignite.internal.processors.hadoop.planner.*; import org.jetbrains.annotations.*; import java.util.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/36b439d9/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTestTaskContext.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTestTaskContext.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTestTaskContext.java index 09aa23b..4e0aa9b 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTestTaskContext.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTestTaskContext.java @@ -20,7 +20,6 @@ package org.apache.ignite.internal.processors.hadoop; import org.apache.hadoop.io.*; import org.apache.hadoop.mapred.*; import org.apache.ignite.*; -import org.apache.ignite.hadoop.*; import org.apache.ignite.internal.processors.hadoop.v2.*; import java.io.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/36b439d9/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTestUtils.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTestUtils.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTestUtils.java new file mode 100644 index 0000000..cdbb809 --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTestUtils.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.ignite.internal.util.typedef.*; + +import java.io.*; +import java.util.*; + +import static org.junit.Assert.*; + +/** + * Utility class for tests. + */ +public class GridHadoopTestUtils { + /** + * Checks that job statistics file contains valid strings only. + * + * @param reader Buffered reader to get lines of job statistics. + * @return Amount of events. + * @throws IOException If failed. + */ + public static long simpleCheckJobStatFile(BufferedReader reader) throws IOException { + Collection<String> phases = new HashSet<>(); + + phases.add("submit"); + phases.add("prepare"); + phases.add("start"); + phases.add("finish"); + phases.add("requestId"); + phases.add("responseId"); + + Collection<String> evtTypes = new HashSet<>(); + + evtTypes.add("JOB"); + evtTypes.add("SETUP"); + evtTypes.add("MAP"); + evtTypes.add("SHUFFLE"); + evtTypes.add("REDUCE"); + evtTypes.add("COMBINE"); + evtTypes.add("COMMIT"); + + long evtCnt = 0; + String line; + + Map<Long, String> reduceNodes = new HashMap<>(); + + while((line = reader.readLine()) != null) { + String[] splitLine = line.split(":"); + + //Try parse timestamp + Long.parseLong(splitLine[1]); + + String[] evt = splitLine[0].split(" "); + + assertTrue("Unknown event '" + evt[0] + "'", evtTypes.contains(evt[0])); + + String phase; + + if ("JOB".equals(evt[0])) + phase = evt[1]; + else { + assertEquals(4, evt.length); + assertTrue("The node id is not defined", !F.isEmpty(evt[3])); + + long taskNum = Long.parseLong(evt[1]); + + if (("REDUCE".equals(evt[0]) || "SHUFFLE".equals(evt[0]))) { + String nodeId = reduceNodes.get(taskNum); + + if (nodeId == null) + reduceNodes.put(taskNum, evt[3]); + else + assertEquals("Different nodes for SHUFFLE and REDUCE tasks", nodeId, evt[3]); + } + + phase = evt[2]; + } + + assertTrue("Unknown phase '" + phase + "' in " + Arrays.toString(evt), phases.contains(phase)); + + evtCnt++; + } + + return evtCnt; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/36b439d9/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopV2JobSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopV2JobSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopV2JobSelfTest.java index 03d83a3..b201614 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopV2JobSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopV2JobSelfTest.java @@ -22,7 +22,6 @@ import org.apache.hadoop.io.*; import org.apache.hadoop.io.serializer.*; import org.apache.hadoop.mapred.*; import org.apache.ignite.*; -import org.apache.ignite.hadoop.*; import org.apache.ignite.internal.processors.hadoop.v2.*; import java.io.*;