http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java index 486b856..6242ecc 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java @@ -28,6 +28,7 @@ import org.apache.ignite.hadoop.fs.*; import org.apache.ignite.igfs.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.hadoop.counter.*; +import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters; import org.apache.ignite.internal.processors.hadoop.examples.*; import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.typedef.*; @@ -56,7 +57,7 @@ public class HadoopMapReduceTest extends HadoopAbstractWordCountTest { igfs.mkdirs(inDir); - IgfsPath inFile = new IgfsPath(inDir, GridHadoopWordCount2.class.getSimpleName() + "-input"); + IgfsPath inFile = new IgfsPath(inDir, HadoopWordCount2.class.getSimpleName() + "-input"); generateTestFile(inFile.toString(), "red", 100000, "blue", 200000, "green", 150000, "yellow", 70000 ); @@ -82,11 +83,11 @@ public class HadoopMapReduceTest extends HadoopAbstractWordCountTest { // File system coordinates. setupFileSystems(jobConf); - GridHadoopWordCount1.setTasksClasses(jobConf, !useNewMapper, !useNewCombiner, !useNewReducer); + HadoopWordCount1.setTasksClasses(jobConf, !useNewMapper, !useNewCombiner, !useNewReducer); Job job = Job.getInstance(jobConf); - GridHadoopWordCount2.setTasksClasses(job, useNewMapper, useNewCombiner, useNewReducer); + HadoopWordCount2.setTasksClasses(job, useNewMapper, useNewCombiner, useNewReducer); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); @@ -94,9 +95,9 @@ public class HadoopMapReduceTest extends HadoopAbstractWordCountTest { FileInputFormat.setInputPaths(job, new Path(igfsScheme() + inFile.toString())); FileOutputFormat.setOutputPath(job, new Path(igfsScheme() + PATH_OUTPUT)); - job.setJarByClass(GridHadoopWordCount2.class); + job.setJarByClass(HadoopWordCount2.class); - GridHadoopJobId jobId = new GridHadoopJobId(UUID.randomUUID(), 1); + HadoopJobId jobId = new HadoopJobId(UUID.randomUUID(), 1); IgniteInternalFuture<?> fut = grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration())); @@ -121,8 +122,8 @@ public class HadoopMapReduceTest extends HadoopAbstractWordCountTest { * @param jobId Job id. * @throws IgniteCheckedException */ - private void checkJobStatistics(GridHadoopJobId jobId) throws IgniteCheckedException, IOException { - GridHadoopCounters cntrs = grid(0).hadoop().counters(jobId); + private void checkJobStatistics(HadoopJobId jobId) throws IgniteCheckedException, IOException { + HadoopCounters cntrs = grid(0).hadoop().counters(jobId); HadoopPerformanceCounter perfCntr = HadoopPerformanceCounter.getCounter(cntrs, null); @@ -191,6 +192,6 @@ public class HadoopMapReduceTest extends HadoopAbstractWordCountTest { BufferedReader reader = new BufferedReader(new InputStreamReader(igfs.open(statPath))); - assertEquals(apiEvtCnt, GridHadoopTestUtils.simpleCheckJobStatFile(reader)); + assertEquals(apiEvtCnt, HadoopTestUtils.simpleCheckJobStatFile(reader)); } }
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopPopularWordsTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopPopularWordsTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopPopularWordsTest.java new file mode 100644 index 0000000..a2f2ac3 --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopPopularWordsTest.java @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import com.google.common.collect.*; +import org.apache.hadoop.conf.*; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.*; +import org.apache.hadoop.io.*; +import org.apache.hadoop.mapreduce.*; +import org.apache.hadoop.mapreduce.lib.input.*; +import org.apache.hadoop.mapreduce.lib.output.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.io.*; +import java.util.*; +import java.util.Map.*; + +import static com.google.common.collect.Maps.*; +import static com.google.common.collect.MinMaxPriorityQueue.*; +import static java.util.Collections.*; + +/** + * Hadoop-based 10 popular words example: all files in a given directory are tokenized and for each word longer than + * 3 characters the number of occurrences ins calculated. Finally, 10 words with the highest occurrence count are + * output. + * + * NOTE: in order to run this example on Windows please ensure that cygwin is installed and available in the system + * path. + */ +public class HadoopPopularWordsTest { + /** Ignite home. */ + private static final String IGNITE_HOME = U.getIgniteHome(); + + /** The path to the input directory. ALl files in that directory will be processed. */ + private static final Path BOOKS_LOCAL_DIR = + new Path("file:" + IGNITE_HOME, "modules/tests/java/org/apache/ignite/grid/hadoop/books"); + + /** The path to the output directory. THe result file will be written to this location. */ + private static final Path RESULT_LOCAL_DIR = + new Path("file:" + IGNITE_HOME, "modules/tests/java/org/apache/ignite/grid/hadoop/output"); + + /** Popular books source dir in DFS. */ + private static final Path BOOKS_DFS_DIR = new Path("tmp/word-count-example/in"); + + /** Popular books source dir in DFS. */ + private static final Path RESULT_DFS_DIR = new Path("tmp/word-count-example/out"); + + /** Path to the distributed file system configuration. */ + private static final String DFS_CFG = "examples/config/filesystem/core-site.xml"; + + /** Top N words to select **/ + private static final int POPULAR_WORDS_CNT = 10; + + /** + * For each token in the input string the mapper emits a {word, 1} pair. + */ + private static class TokenizingMapper extends Mapper<LongWritable, Text, Text, IntWritable> { + /** Constant value. */ + private static final IntWritable ONE = new IntWritable(1); + + /** The word converted into the Text. */ + private Text word = new Text(); + + /** + * Emits a entry where the key is the word and the value is always 1. + * + * @param key the current position in the input file (not used here) + * @param val the text string + * @param ctx mapper context + * @throws IOException + * @throws InterruptedException + */ + @Override protected void map(LongWritable key, Text val, Context ctx) + throws IOException, InterruptedException { + // Get the mapped object. + final String line = val.toString(); + + // Splits the given string to words. + final String[] words = line.split("[^a-zA-Z0-9]"); + + for (final String w : words) { + // Only emit counts for longer words. + if (w.length() <= 3) + continue; + + word.set(w); + + // Write the word into the context with the initial count equals 1. + ctx.write(word, ONE); + } + } + } + + /** + * The reducer uses a priority queue to rank the words based on its number of occurrences. + */ + private static class TopNWordsReducer extends Reducer<Text, IntWritable, Text, IntWritable> { + private MinMaxPriorityQueue<Entry<Integer, String>> q; + + TopNWordsReducer() { + q = orderedBy(reverseOrder(new Comparator<Entry<Integer, String>>() { + @Override public int compare(Entry<Integer, String> o1, Entry<Integer, String> o2) { + return o1.getKey().compareTo(o2.getKey()); + } + })).expectedSize(POPULAR_WORDS_CNT).maximumSize(POPULAR_WORDS_CNT).create(); + } + + /** + * This method doesn't emit anything, but just keeps track of the top N words. + * + * @param key The word. + * @param vals The words counts. + * @param ctx Reducer context. + * @throws IOException If failed. + * @throws InterruptedException If failed. + */ + @Override public void reduce(Text key, Iterable<IntWritable> vals, Context ctx) throws IOException, + InterruptedException { + int sum = 0; + + for (IntWritable val : vals) + sum += val.get(); + + q.add(immutableEntry(sum, key.toString())); + } + + /** + * This method is called after all the word entries have been processed. It writes the accumulated + * statistics to the job output file. + * + * @param ctx The job context. + * @throws IOException If failed. + * @throws InterruptedException If failed. + */ + @Override protected void cleanup(Context ctx) throws IOException, InterruptedException { + IntWritable i = new IntWritable(); + + Text txt = new Text(); + + // iterate in desc order + while (!q.isEmpty()) { + Entry<Integer, String> e = q.removeFirst(); + + i.set(e.getKey()); + + txt.set(e.getValue()); + + ctx.write(txt, i); + } + } + } + + /** + * Configures the Hadoop MapReduce job. + * + * @return Instance of the Hadoop MapRed job. + * @throws IOException If failed. + */ + private Job createConfigBasedHadoopJob() throws IOException { + Job jobCfg = new Job(); + + Configuration cfg = jobCfg.getConfiguration(); + + // Use explicit configuration of distributed file system, if provided. + if (DFS_CFG != null) + cfg.addResource(U.resolveIgniteUrl(DFS_CFG)); + + jobCfg.setJobName("HadoopPopularWordExample"); + jobCfg.setJarByClass(HadoopPopularWordsTest.class); + jobCfg.setInputFormatClass(TextInputFormat.class); + jobCfg.setOutputKeyClass(Text.class); + jobCfg.setOutputValueClass(IntWritable.class); + jobCfg.setMapperClass(TokenizingMapper.class); + jobCfg.setReducerClass(TopNWordsReducer.class); + + FileInputFormat.setInputPaths(jobCfg, BOOKS_DFS_DIR); + FileOutputFormat.setOutputPath(jobCfg, RESULT_DFS_DIR); + + // Local job tracker allows the only task per wave, but text input format + // replaces it with the calculated value based on input split size option. + if ("local".equals(cfg.get("mapred.job.tracker", "local"))) { + // Split job into tasks using 32MB split size. + FileInputFormat.setMinInputSplitSize(jobCfg, 32 * 1024 * 1024); + FileInputFormat.setMaxInputSplitSize(jobCfg, Long.MAX_VALUE); + } + + return jobCfg; + } + + /** + * Runs the Hadoop job. + * + * @return {@code True} if succeeded, {@code false} otherwise. + * @throws Exception If failed. + */ + private boolean runWordCountConfigBasedHadoopJob() throws Exception { + Job job = createConfigBasedHadoopJob(); + + // Distributed file system this job will work with. + FileSystem fs = FileSystem.get(job.getConfiguration()); + + X.println(">>> Using distributed file system: " + fs.getHomeDirectory()); + + // Prepare input and output job directories. + prepareDirectories(fs); + + long time = System.currentTimeMillis(); + + // Run job. + boolean res = job.waitForCompletion(true); + + X.println(">>> Job execution time: " + (System.currentTimeMillis() - time) / 1000 + " sec."); + + // Move job results into local file system, so you can view calculated results. + publishResults(fs); + + return res; + } + + /** + * Prepare job's data: cleanup result directories that might have left over + * after previous runs, copy input files from the local file system into DFS. + * + * @param fs Distributed file system to use in job. + * @throws IOException If failed. + */ + private void prepareDirectories(FileSystem fs) throws IOException { + X.println(">>> Cleaning up DFS result directory: " + RESULT_DFS_DIR); + + fs.delete(RESULT_DFS_DIR, true); + + X.println(">>> Cleaning up DFS input directory: " + BOOKS_DFS_DIR); + + fs.delete(BOOKS_DFS_DIR, true); + + X.println(">>> Copy local files into DFS input directory: " + BOOKS_DFS_DIR); + + fs.copyFromLocalFile(BOOKS_LOCAL_DIR, BOOKS_DFS_DIR); + } + + /** + * Publish job execution results into local file system, so you can view them. + * + * @param fs Distributed file sytem used in job. + * @throws IOException If failed. + */ + private void publishResults(FileSystem fs) throws IOException { + X.println(">>> Cleaning up DFS input directory: " + BOOKS_DFS_DIR); + + fs.delete(BOOKS_DFS_DIR, true); + + X.println(">>> Cleaning up LOCAL result directory: " + RESULT_LOCAL_DIR); + + fs.delete(RESULT_LOCAL_DIR, true); + + X.println(">>> Moving job results into LOCAL result directory: " + RESULT_LOCAL_DIR); + + fs.copyToLocalFile(true, RESULT_DFS_DIR, RESULT_LOCAL_DIR); + } + + /** + * Executes a modified version of the Hadoop word count example. Here, in addition to counting the number of + * occurrences of the word in the source files, the N most popular words are selected. + * + * @param args None. + */ + public static void main(String[] args) { + try { + new HadoopPopularWordsTest().runWordCountConfigBasedHadoopJob(); + } + catch (Exception e) { + X.println(">>> Failed to run word count example: " + e.getMessage()); + } + + System.exit(0); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSharedMap.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSharedMap.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSharedMap.java new file mode 100644 index 0000000..c73ee9f --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSharedMap.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.jdk8.backport.*; + +import java.util.concurrent.*; + +/** + * For tests. + */ +public class HadoopSharedMap { + /** */ + private static final ConcurrentMap<String, HadoopSharedMap> maps = new ConcurrentHashMap8<>(); + + /** */ + private final ConcurrentMap<String, Object> map = new ConcurrentHashMap8<>(); + + /** + * Private. + */ + private HadoopSharedMap() { + // No-op. + } + + /** + * Puts object by key. + * + * @param key Key. + * @param val Value. + */ + public <T> T put(String key, T val) { + Object old = map.putIfAbsent(key, val); + + return old == null ? val : (T)old; + } + + /** + * @param cls Class. + * @return Map of static fields. + */ + public static HadoopSharedMap map(Class<?> cls) { + HadoopSharedMap m = maps.get(cls.getName()); + + if (m != null) + return m; + + HadoopSharedMap old = maps.putIfAbsent(cls.getName(), m = new HadoopSharedMap()); + + return old == null ? m : old; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSortingExternalTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSortingExternalTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSortingExternalTest.java index 76357c0..772e77d 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSortingExternalTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSortingExternalTest.java @@ -17,13 +17,15 @@ package org.apache.ignite.internal.processors.hadoop; +import org.apache.ignite.configuration.*; + /** * External test for sorting. */ public class HadoopSortingExternalTest extends HadoopSortingTest { /** {@inheritDoc} */ - @Override public GridHadoopConfiguration hadoopConfiguration(String gridName) { - GridHadoopConfiguration cfg = super.hadoopConfiguration(gridName); + @Override public HadoopConfiguration hadoopConfiguration(String gridName) { + HadoopConfiguration cfg = super.hadoopConfiguration(gridName); cfg.setExternalExecution(true); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSortingTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSortingTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSortingTest.java index 5d28a30..3f6594a 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSortingTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSortingTest.java @@ -23,6 +23,7 @@ import org.apache.hadoop.io.serializer.*; import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.lib.input.*; import org.apache.hadoop.mapreduce.lib.output.*; +import org.apache.ignite.configuration.*; import org.apache.ignite.internal.util.typedef.*; import java.io.*; @@ -64,8 +65,8 @@ public class HadoopSortingTest extends HadoopAbstractSelfTest { } /** {@inheritDoc} */ - @Override public GridHadoopConfiguration hadoopConfiguration(String gridName) { - GridHadoopConfiguration cfg = super.hadoopConfiguration(gridName); + @Override public HadoopConfiguration hadoopConfiguration(String gridName) { + HadoopConfiguration cfg = super.hadoopConfiguration(gridName); cfg.setExternalExecution(false); @@ -93,7 +94,7 @@ public class HadoopSortingTest extends HadoopAbstractSelfTest { X.printerrln("Data generation started."); - grid(0).hadoop().submit(new GridHadoopJobId(UUID.randomUUID(), 1), + grid(0).hadoop().submit(new HadoopJobId(UUID.randomUUID(), 1), createJobInfo(job.getConfiguration())).get(180000); X.printerrln("Data generation complete."); @@ -124,7 +125,7 @@ public class HadoopSortingTest extends HadoopAbstractSelfTest { X.printerrln("Job started."); - grid(0).hadoop().submit(new GridHadoopJobId(UUID.randomUUID(), 2), + grid(0).hadoop().submit(new HadoopJobId(UUID.randomUUID(), 2), createJobInfo(job.getConfiguration())).get(180000); X.printerrln("Job complete."); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopStartup.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopStartup.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopStartup.java new file mode 100644 index 0000000..1a93223 --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopStartup.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.hadoop.conf.*; +import org.apache.ignite.hadoop.fs.v2.IgniteHadoopFileSystem; +import org.apache.ignite.internal.util.typedef.*; + +/** + * Hadoop node startup. + */ +public class HadoopStartup { + /** + * @param args Arguments. + */ + public static void main(String[] args) { + G.start("config/hadoop/default-config.xml"); + } + + /** + * @return Configuration for job run. + */ + @SuppressWarnings("UnnecessaryFullyQualifiedName") + public static Configuration configuration() { + Configuration cfg = new Configuration(); + + cfg.set("fs.defaultFS", "igfs://igfs@localhost"); + + cfg.set("fs.igfs.impl", org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem.class.getName()); + cfg.set("fs.AbstractFileSystem.igfs.impl", IgniteHadoopFileSystem.class.getName()); + + cfg.set("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER"); + + cfg.set("mapreduce.framework.name", "ignite"); + cfg.set("mapreduce.jobtracker.address", "localhost:11211"); + + return cfg; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskExecutionSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskExecutionSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskExecutionSelfTest.java index a489f28..e321191 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskExecutionSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskExecutionSelfTest.java @@ -46,7 +46,7 @@ import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*; */ public class HadoopTaskExecutionSelfTest extends HadoopAbstractSelfTest { /** */ - private static GridHadoopSharedMap m = GridHadoopSharedMap.map(HadoopTaskExecutionSelfTest.class); + private static HadoopSharedMap m = HadoopSharedMap.map(HadoopTaskExecutionSelfTest.class); /** Line count. */ private static final AtomicInteger totalLineCnt = m.put("totalLineCnt", new AtomicInteger()); @@ -105,8 +105,8 @@ public class HadoopTaskExecutionSelfTest extends HadoopAbstractSelfTest { } /** {@inheritDoc} */ - @Override public GridHadoopConfiguration hadoopConfiguration(String gridName) { - GridHadoopConfiguration cfg = super.hadoopConfiguration(gridName); + @Override public HadoopConfiguration hadoopConfiguration(String gridName) { + HadoopConfiguration cfg = super.hadoopConfiguration(gridName); cfg.setMaxParallelTasks(5); cfg.setExternalExecution(false); @@ -145,7 +145,7 @@ public class HadoopTaskExecutionSelfTest extends HadoopAbstractSelfTest { job.setJarByClass(getClass()); - IgniteInternalFuture<?> fut = grid(0).hadoop().submit(new GridHadoopJobId(UUID.randomUUID(), 1), + IgniteInternalFuture<?> fut = grid(0).hadoop().submit(new HadoopJobId(UUID.randomUUID(), 1), createJobInfo(job.getConfiguration())); fut.get(); @@ -189,7 +189,7 @@ public class HadoopTaskExecutionSelfTest extends HadoopAbstractSelfTest { job.setJarByClass(getClass()); - GridHadoopJobId jobId = new GridHadoopJobId(UUID.randomUUID(), 2); + HadoopJobId jobId = new HadoopJobId(UUID.randomUUID(), 2); IgniteInternalFuture<?> fut = grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration())); @@ -228,7 +228,7 @@ public class HadoopTaskExecutionSelfTest extends HadoopAbstractSelfTest { job.setJarByClass(getClass()); - final IgniteInternalFuture<?> fut = grid(0).hadoop().submit(new GridHadoopJobId(UUID.randomUUID(), 3), + final IgniteInternalFuture<?> fut = grid(0).hadoop().submit(new HadoopJobId(UUID.randomUUID(), 3), createJobInfo(job.getConfiguration())); GridTestUtils.assertThrows(log, new Callable<Object>() { @@ -314,7 +314,7 @@ public class HadoopTaskExecutionSelfTest extends HadoopAbstractSelfTest { public void testTaskCancelling() throws Exception { Configuration cfg = prepareJobForCancelling(); - GridHadoopJobId jobId = new GridHadoopJobId(UUID.randomUUID(), 1); + HadoopJobId jobId = new HadoopJobId(UUID.randomUUID(), 1); final IgniteInternalFuture<?> fut = grid(0).hadoop().submit(jobId, createJobInfo(cfg)); @@ -360,7 +360,7 @@ public class HadoopTaskExecutionSelfTest extends HadoopAbstractSelfTest { Hadoop hadoop = grid(0).hadoop(); - GridHadoopJobId jobId = new GridHadoopJobId(UUID.randomUUID(), 1); + HadoopJobId jobId = new HadoopJobId(UUID.randomUUID(), 1); //Kill unknown job. boolean killRes = hadoop.kill(jobId); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksAllVersionsTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksAllVersionsTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksAllVersionsTest.java index 265890d..aaf0f92 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksAllVersionsTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksAllVersionsTest.java @@ -61,7 +61,7 @@ abstract class HadoopTasksAllVersionsTest extends HadoopAbstractWordCountTest { igfs.mkdirs(inDir); - IgfsPath inFile = new IgfsPath(inDir, GridHadoopWordCount2.class.getSimpleName() + "-input"); + IgfsPath inFile = new IgfsPath(inDir, HadoopWordCount2.class.getSimpleName() + "-input"); URI inFileUri = URI.create(igfsScheme() + inFile.toString()); @@ -81,7 +81,7 @@ abstract class HadoopTasksAllVersionsTest extends HadoopAbstractWordCountTest { HadoopV2Job gridJob = getHadoopJob(igfsScheme() + inFile.toString(), igfsScheme() + PATH_OUTPUT); - GridHadoopTaskInfo taskInfo = new GridHadoopTaskInfo(HadoopTaskType.MAP, gridJob.id(), 0, 0, fileBlock1); + HadoopTaskInfo taskInfo = new HadoopTaskInfo(HadoopTaskType.MAP, gridJob.id(), 0, 0, fileBlock1); HadoopTestTaskContext ctx = new HadoopTestTaskContext(taskInfo, gridJob); @@ -93,7 +93,7 @@ abstract class HadoopTasksAllVersionsTest extends HadoopAbstractWordCountTest { ctx.mockOutput().clear(); - ctx.taskInfo(new GridHadoopTaskInfo(HadoopTaskType.MAP, gridJob.id(), 0, 0, fileBlock2)); + ctx.taskInfo(new HadoopTaskInfo(HadoopTaskType.MAP, gridJob.id(), 0, 0, fileBlock2)); ctx.run(); @@ -112,7 +112,7 @@ abstract class HadoopTasksAllVersionsTest extends HadoopAbstractWordCountTest { */ private HadoopTestTaskContext runTaskWithInput(HadoopV2Job gridJob, HadoopTaskType taskType, int taskNum, String... words) throws IgniteCheckedException { - GridHadoopTaskInfo taskInfo = new GridHadoopTaskInfo(taskType, gridJob.id(), taskNum, 0, null); + HadoopTaskInfo taskInfo = new HadoopTaskInfo(taskType, gridJob.id(), taskNum, 0, null); HadoopTestTaskContext ctx = new HadoopTestTaskContext(taskInfo, gridJob); @@ -184,14 +184,14 @@ abstract class HadoopTasksAllVersionsTest extends HadoopAbstractWordCountTest { */ private HadoopTestTaskContext runMapCombineTask(HadoopFileBlock fileBlock, HadoopV2Job gridJob) throws IgniteCheckedException { - GridHadoopTaskInfo taskInfo = new GridHadoopTaskInfo(HadoopTaskType.MAP, gridJob.id(), 0, 0, fileBlock); + HadoopTaskInfo taskInfo = new HadoopTaskInfo(HadoopTaskType.MAP, gridJob.id(), 0, 0, fileBlock); HadoopTestTaskContext mapCtx = new HadoopTestTaskContext(taskInfo, gridJob); mapCtx.run(); //Prepare input for combine - taskInfo = new GridHadoopTaskInfo(HadoopTaskType.COMBINE, gridJob.id(), 0, 0, null); + taskInfo = new HadoopTaskInfo(HadoopTaskType.COMBINE, gridJob.id(), 0, 0, null); HadoopTestTaskContext combineCtx = new HadoopTestTaskContext(taskInfo, gridJob); @@ -214,7 +214,7 @@ abstract class HadoopTasksAllVersionsTest extends HadoopAbstractWordCountTest { igfs.mkdirs(inDir); - IgfsPath inFile = new IgfsPath(inDir, GridHadoopWordCount2.class.getSimpleName() + "-input"); + IgfsPath inFile = new IgfsPath(inDir, HadoopWordCount2.class.getSimpleName() + "-input"); URI inFileUri = URI.create(igfsScheme() + inFile.toString()); @@ -235,7 +235,7 @@ abstract class HadoopTasksAllVersionsTest extends HadoopAbstractWordCountTest { HadoopTestTaskContext combine2Ctx = runMapCombineTask(fileBlock2, gridJob); //Prepare input for combine - GridHadoopTaskInfo taskInfo = new GridHadoopTaskInfo(HadoopTaskType.REDUCE, gridJob.id(), 0, 0, null); + HadoopTaskInfo taskInfo = new HadoopTaskInfo(HadoopTaskType.REDUCE, gridJob.id(), 0, 0, null); HadoopTestTaskContext reduceCtx = new HadoopTestTaskContext(taskInfo, gridJob); @@ -244,7 +244,7 @@ abstract class HadoopTasksAllVersionsTest extends HadoopAbstractWordCountTest { reduceCtx.run(); - reduceCtx.taskInfo(new GridHadoopTaskInfo(HadoopTaskType.COMMIT, gridJob.id(), 0, 0, null)); + reduceCtx.taskInfo(new HadoopTaskInfo(HadoopTaskType.COMMIT, gridJob.id(), 0, 0, null)); reduceCtx.run(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java index d932a8f..b41a260 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java @@ -39,13 +39,13 @@ public class HadoopTasksV1Test extends HadoopTasksAllVersionsTest { * @throws IOException If fails. */ @Override public HadoopV2Job getHadoopJob(String inFile, String outFile) throws Exception { - JobConf jobConf = GridHadoopWordCount1.getJob(inFile, outFile); + JobConf jobConf = HadoopWordCount1.getJob(inFile, outFile); setupFileSystems(jobConf); HadoopDefaultJobInfo jobInfo = createJobInfo(jobConf); - GridHadoopJobId jobId = new GridHadoopJobId(new UUID(0, 0), 0); + HadoopJobId jobId = new HadoopJobId(new UUID(0, 0), 0); return new HadoopV2Job(jobId, jobInfo, log); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java index 2e1e9fd..b677c63 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java @@ -48,7 +48,7 @@ public class HadoopTasksV2Test extends HadoopTasksAllVersionsTest { job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); - GridHadoopWordCount2.setTasksClasses(job, true, true, true); + HadoopWordCount2.setTasksClasses(job, true, true, true); Configuration conf = job.getConfiguration(); @@ -57,13 +57,13 @@ public class HadoopTasksV2Test extends HadoopTasksAllVersionsTest { FileInputFormat.setInputPaths(job, new Path(inFile)); FileOutputFormat.setOutputPath(job, new Path(outFile)); - job.setJarByClass(GridHadoopWordCount2.class); + job.setJarByClass(HadoopWordCount2.class); - Job hadoopJob = GridHadoopWordCount2.getJob(inFile, outFile); + Job hadoopJob = HadoopWordCount2.getJob(inFile, outFile); HadoopDefaultJobInfo jobInfo = createJobInfo(hadoopJob.getConfiguration()); - GridHadoopJobId jobId = new GridHadoopJobId(new UUID(0, 0), 0); + HadoopJobId jobId = new HadoopJobId(new UUID(0, 0), 0); return new HadoopV2Job(jobId, jobInfo, log); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTestRoundRobinMrPlanner.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTestRoundRobinMrPlanner.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTestRoundRobinMrPlanner.java new file mode 100644 index 0000000..a56c7c7 --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTestRoundRobinMrPlanner.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.internal.processors.hadoop.planner.*; +import org.jetbrains.annotations.*; + +import java.util.*; + +/** + * Round-robin mr planner. + */ +public class HadoopTestRoundRobinMrPlanner implements HadoopMapReducePlanner { + /** {@inheritDoc} */ + @Override public HadoopMapReducePlan preparePlan(HadoopJob job, Collection<ClusterNode> top, + @Nullable HadoopMapReducePlan oldPlan) throws IgniteCheckedException { + if (top.isEmpty()) + throw new IllegalArgumentException("Topology is empty"); + + // Has at least one element. + Iterator<ClusterNode> it = top.iterator(); + + Map<UUID, Collection<HadoopInputSplit>> mappers = new HashMap<>(); + + for (HadoopInputSplit block : job.input()) { + ClusterNode node = it.next(); + + Collection<HadoopInputSplit> nodeBlocks = mappers.get(node.id()); + + if (nodeBlocks == null) { + nodeBlocks = new ArrayList<>(); + + mappers.put(node.id(), nodeBlocks); + } + + nodeBlocks.add(block); + + if (!it.hasNext()) + it = top.iterator(); + } + + int[] rdc = new int[job.info().reducers()]; + + for (int i = 0; i < rdc.length; i++) + rdc[i] = i; + + return new HadoopDefaultMapReducePlan(mappers, Collections.singletonMap(it.next().id(), rdc)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTestTaskContext.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTestTaskContext.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTestTaskContext.java index c3c8806..e444270 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTestTaskContext.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTestTaskContext.java @@ -178,7 +178,7 @@ class HadoopTestTaskContext extends HadoopV2TaskContext { * @param taskInfo Task info. * @param gridJob Grid Hadoop job. */ - public HadoopTestTaskContext(GridHadoopTaskInfo taskInfo, HadoopJob gridJob) throws IgniteCheckedException { + public HadoopTestTaskContext(HadoopTaskInfo taskInfo, HadoopJob gridJob) throws IgniteCheckedException { super(taskInfo, gridJob, gridJob.id(), null, jobConfDataInput(gridJob)); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTestUtils.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTestUtils.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTestUtils.java new file mode 100644 index 0000000..ef60762 --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTestUtils.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.ignite.internal.util.typedef.*; + +import java.io.*; +import java.util.*; + +import static org.junit.Assert.*; + +/** + * Utility class for tests. + */ +public class HadoopTestUtils { + /** + * Checks that job statistics file contains valid strings only. + * + * @param reader Buffered reader to get lines of job statistics. + * @return Amount of events. + * @throws IOException If failed. + */ + public static long simpleCheckJobStatFile(BufferedReader reader) throws IOException { + Collection<String> phases = new HashSet<>(); + + phases.add("submit"); + phases.add("prepare"); + phases.add("start"); + phases.add("finish"); + phases.add("requestId"); + phases.add("responseId"); + + Collection<String> evtTypes = new HashSet<>(); + + evtTypes.add("JOB"); + evtTypes.add("SETUP"); + evtTypes.add("MAP"); + evtTypes.add("SHUFFLE"); + evtTypes.add("REDUCE"); + evtTypes.add("COMBINE"); + evtTypes.add("COMMIT"); + + long evtCnt = 0; + String line; + + Map<Long, String> reduceNodes = new HashMap<>(); + + while((line = reader.readLine()) != null) { + String[] splitLine = line.split(":"); + + //Try parse timestamp + Long.parseLong(splitLine[1]); + + String[] evt = splitLine[0].split(" "); + + assertTrue("Unknown event '" + evt[0] + "'", evtTypes.contains(evt[0])); + + String phase; + + if ("JOB".equals(evt[0])) + phase = evt[1]; + else { + assertEquals(4, evt.length); + assertTrue("The node id is not defined", !F.isEmpty(evt[3])); + + long taskNum = Long.parseLong(evt[1]); + + if (("REDUCE".equals(evt[0]) || "SHUFFLE".equals(evt[0]))) { + String nodeId = reduceNodes.get(taskNum); + + if (nodeId == null) + reduceNodes.put(taskNum, evt[3]); + else + assertEquals("Different nodes for SHUFFLE and REDUCE tasks", nodeId, evt[3]); + } + + phase = evt[2]; + } + + assertTrue("Unknown phase '" + phase + "' in " + Arrays.toString(evt), phases.contains(phase)); + + evtCnt++; + } + + return evtCnt; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java index 222ba17..ebc89f4 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java @@ -66,9 +66,9 @@ public class HadoopV2JobSelfTest extends HadoopAbstractSelfTest { cfg.setMapOutputValueClass(Text.class); cfg.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, CustomSerialization.class.getName()); - HadoopJob job = new HadoopV2Job(new GridHadoopJobId(UUID.randomUUID(), 1), createJobInfo(cfg), log); + HadoopJob job = new HadoopV2Job(new HadoopJobId(UUID.randomUUID(), 1), createJobInfo(cfg), log); - HadoopTaskContext taskCtx = job.getTaskContext(new GridHadoopTaskInfo(HadoopTaskType.MAP, null, 0, 0, + HadoopTaskContext taskCtx = job.getTaskContext(new HadoopTaskInfo(HadoopTaskType.MAP, null, 0, 0, null)); HadoopSerialization ser = taskCtx.keySerialization(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/GridHadoopWordCount1.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/GridHadoopWordCount1.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/GridHadoopWordCount1.java deleted file mode 100644 index 40cf636..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/GridHadoopWordCount1.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.examples; - -import org.apache.hadoop.fs.*; -import org.apache.hadoop.io.*; -import org.apache.hadoop.mapred.*; - -/** - * Example job for testing hadoop task execution. - */ -public class GridHadoopWordCount1 { - /** - * Entry point to start job. - * @param args command line parameters. - * @throws Exception if fails. - */ - public static void main(String[] args) throws Exception { - if (args.length != 2) { - System.out.println("usage: [input] [output]"); - System.exit(-1); - } - - JobConf job = getJob(args[0], args[1]); - - JobClient.runJob(job); - } - - /** - * Gets fully configured JobConf instance. - * - * @param input input file name. - * @param output output directory name. - * @return Job configuration - */ - public static JobConf getJob(String input, String output) { - JobConf conf = new JobConf(GridHadoopWordCount1.class); - conf.setJobName("wordcount"); - - conf.setOutputKeyClass(Text.class); - conf.setOutputValueClass(IntWritable.class); - - setTasksClasses(conf, true, true, true); - - FileInputFormat.setInputPaths(conf, new Path(input)); - FileOutputFormat.setOutputPath(conf, new Path(output)); - - return conf; - } - - /** - * Sets task classes with related info if needed into configuration object. - * - * @param jobConf Configuration to change. - * @param setMapper Option to set mapper and input format classes. - * @param setCombiner Option to set combiner class. - * @param setReducer Option to set reducer and output format classes. - */ - public static void setTasksClasses(JobConf jobConf, boolean setMapper, boolean setCombiner, boolean setReducer) { - if (setMapper) { - jobConf.setMapperClass(GridHadoopWordCount1Map.class); - jobConf.setInputFormat(TextInputFormat.class); - } - - if (setCombiner) - jobConf.setCombinerClass(GridHadoopWordCount1Reduce.class); - - if (setReducer) { - jobConf.setReducerClass(GridHadoopWordCount1Reduce.class); - jobConf.setOutputFormat(TextOutputFormat.class); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/GridHadoopWordCount1Map.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/GridHadoopWordCount1Map.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/GridHadoopWordCount1Map.java deleted file mode 100644 index 5d8e0cc..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/GridHadoopWordCount1Map.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.examples; - -import org.apache.hadoop.io.*; -import org.apache.hadoop.mapred.*; - -import java.io.*; -import java.util.*; - -/** - * Mapper phase of WordCount job. - */ -public class GridHadoopWordCount1Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> { - /** Writable integer constant of '1' is writing as count of found words. */ - private static final IntWritable one = new IntWritable(1); - - /** Writable container for writing word. */ - private Text word = new Text(); - - /** Flag is to check that mapper was configured before run. */ - private boolean wasConfigured; - - /** {@inheritDoc} */ - @Override public void map(LongWritable key, Text val, OutputCollector<Text, IntWritable> output, Reporter reporter) - throws IOException { - - assert wasConfigured : "Mapper should be configured"; - - String line = val.toString(); - - StringTokenizer tokenizer = new StringTokenizer(line); - - while (tokenizer.hasMoreTokens()) { - word.set(tokenizer.nextToken()); - - output.collect(word, one); - } - } - - /** {@inheritDoc} */ - @Override public void configure(JobConf job) { - super.configure(job); - - wasConfigured = true; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/GridHadoopWordCount1Reduce.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/GridHadoopWordCount1Reduce.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/GridHadoopWordCount1Reduce.java deleted file mode 100644 index 1b69a43..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/GridHadoopWordCount1Reduce.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.examples; - -import org.apache.hadoop.io.*; -import org.apache.hadoop.mapred.*; - -import java.io.*; -import java.util.*; - -/** - * Combiner and Reducer phase of WordCount job. - */ -public class GridHadoopWordCount1Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { - /** Flag is to check that mapper was configured before run. */ - private boolean wasConfigured; - - /** {@inheritDoc} */ - @Override public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) - throws IOException { - assert wasConfigured : "Reducer should be configured"; - - int sum = 0; - - while (values.hasNext()) - sum += values.next().get(); - - output.collect(key, new IntWritable(sum)); - } - - @Override public void configure(JobConf job) { - super.configure(job); - - wasConfigured = true; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/GridHadoopWordCount2.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/GridHadoopWordCount2.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/GridHadoopWordCount2.java deleted file mode 100644 index 6310363..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/GridHadoopWordCount2.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.examples; - -import org.apache.hadoop.fs.*; -import org.apache.hadoop.io.*; -import org.apache.hadoop.mapreduce.*; -import org.apache.hadoop.mapreduce.lib.input.*; -import org.apache.hadoop.mapreduce.lib.output.*; - -import java.io.*; - -/** - * Example job for testing hadoop task execution. - */ -public class GridHadoopWordCount2 { - /** - * Entry point to start job. - * - * @param args Command line parameters. - * @throws Exception If fails. - */ - public static void main(String[] args) throws Exception { - if (args.length != 2) { - System.out.println("usage: [input] [output]"); - System.exit(-1); - } - - Job job = getJob(args[0], args[1]); - - job.submit(); - } - - /** - * Gets fully configured Job instance. - * - * @param input Input file name. - * @param output Output directory name. - * @return Job instance. - * @throws IOException If fails. - */ - public static Job getJob(String input, String output) throws IOException { - Job job = Job.getInstance(); - - job.setOutputKeyClass(Text.class); - job.setOutputValueClass(IntWritable.class); - - setTasksClasses(job, true, true, true); - - FileInputFormat.setInputPaths(job, new Path(input)); - FileOutputFormat.setOutputPath(job, new Path(output)); - - job.setJarByClass(GridHadoopWordCount2.class); - - return job; - } - - /** - * Sets task classes with related info if needed into configuration object. - * - * @param job Configuration to change. - * @param setMapper Option to set mapper and input format classes. - * @param setCombiner Option to set combiner class. - * @param setReducer Option to set reducer and output format classes. - */ - public static void setTasksClasses(Job job, boolean setMapper, boolean setCombiner, boolean setReducer) { - if (setMapper) { - job.setMapperClass(GridHadoopWordCount2Mapper.class); - job.setInputFormatClass(TextInputFormat.class); - } - - if (setCombiner) - job.setCombinerClass(GridHadoopWordCount2Reducer.class); - - if (setReducer) { - job.setReducerClass(GridHadoopWordCount2Reducer.class); - job.setOutputFormatClass(TextOutputFormat.class); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/GridHadoopWordCount2Mapper.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/GridHadoopWordCount2Mapper.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/GridHadoopWordCount2Mapper.java deleted file mode 100644 index 849928a..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/GridHadoopWordCount2Mapper.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.examples; - -import org.apache.hadoop.conf.*; -import org.apache.hadoop.io.*; -import org.apache.hadoop.mapreduce.*; - -import java.io.*; -import java.util.*; - -/** - * Mapper phase of WordCount job. - */ -public class GridHadoopWordCount2Mapper extends Mapper<Object, Text, Text, IntWritable> implements Configurable { - /** Writable container for writing word. */ - private Text word = new Text(); - - /** Writable integer constant of '1' is writing as count of found words. */ - private static final IntWritable one = new IntWritable(1); - - /** Flag is to check that mapper was configured before run. */ - private boolean wasConfigured; - - /** Flag is to check that mapper was set up before run. */ - private boolean wasSetUp; - - /** {@inheritDoc} */ - @Override public void map(Object key, Text val, Context ctx) throws IOException, InterruptedException { - assert wasConfigured : "Mapper should be configured"; - assert wasSetUp : "Mapper should be set up"; - - StringTokenizer wordList = new StringTokenizer(val.toString()); - - while (wordList.hasMoreTokens()) { - word.set(wordList.nextToken()); - - ctx.write(word, one); - } - } - - /** {@inheritDoc} */ - @Override protected void setup(Context context) throws IOException, InterruptedException { - super.setup(context); - wasSetUp = true; - } - - /** {@inheritDoc} */ - @Override public void setConf(Configuration conf) { - wasConfigured = true; - } - - /** {@inheritDoc} */ - @Override public Configuration getConf() { - return null; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/GridHadoopWordCount2Reducer.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/GridHadoopWordCount2Reducer.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/GridHadoopWordCount2Reducer.java deleted file mode 100644 index 922bb2f..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/GridHadoopWordCount2Reducer.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.examples; - -import org.apache.hadoop.conf.*; -import org.apache.hadoop.io.*; -import org.apache.hadoop.mapreduce.*; - -import java.io.*; - -/** - * Combiner and Reducer phase of WordCount job. - */ -public class GridHadoopWordCount2Reducer extends Reducer<Text, IntWritable, Text, IntWritable> implements Configurable { - /** Writable container for writing sum of word counts. */ - private IntWritable totalWordCnt = new IntWritable(); - - /** Flag is to check that mapper was configured before run. */ - private boolean wasConfigured; - - /** Flag is to check that mapper was set up before run. */ - private boolean wasSetUp; - - /** {@inheritDoc} */ - @Override public void reduce(Text key, Iterable<IntWritable> values, Context ctx) throws IOException, InterruptedException { - assert wasConfigured : "Reducer should be configured"; - assert wasSetUp : "Reducer should be set up"; - - int wordCnt = 0; - - for (IntWritable value : values) - wordCnt += value.get(); - - totalWordCnt.set(wordCnt); - - ctx.write(key, totalWordCnt); - } - - /** {@inheritDoc} */ - @Override protected void setup(Context context) throws IOException, InterruptedException { - super.setup(context); - wasSetUp = true; - } - - /** {@inheritDoc} */ - @Override public void setConf(Configuration conf) { - wasConfigured = true; - } - - /** {@inheritDoc} */ - @Override public Configuration getConf() { - return null; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount1.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount1.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount1.java new file mode 100644 index 0000000..dd9058d --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount1.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.examples; + +import org.apache.hadoop.fs.*; +import org.apache.hadoop.io.*; +import org.apache.hadoop.mapred.*; + +/** + * Example job for testing hadoop task execution. + */ +public class HadoopWordCount1 { + /** + * Entry point to start job. + * @param args command line parameters. + * @throws Exception if fails. + */ + public static void main(String[] args) throws Exception { + if (args.length != 2) { + System.out.println("usage: [input] [output]"); + System.exit(-1); + } + + JobConf job = getJob(args[0], args[1]); + + JobClient.runJob(job); + } + + /** + * Gets fully configured JobConf instance. + * + * @param input input file name. + * @param output output directory name. + * @return Job configuration + */ + public static JobConf getJob(String input, String output) { + JobConf conf = new JobConf(HadoopWordCount1.class); + conf.setJobName("wordcount"); + + conf.setOutputKeyClass(Text.class); + conf.setOutputValueClass(IntWritable.class); + + setTasksClasses(conf, true, true, true); + + FileInputFormat.setInputPaths(conf, new Path(input)); + FileOutputFormat.setOutputPath(conf, new Path(output)); + + return conf; + } + + /** + * Sets task classes with related info if needed into configuration object. + * + * @param jobConf Configuration to change. + * @param setMapper Option to set mapper and input format classes. + * @param setCombiner Option to set combiner class. + * @param setReducer Option to set reducer and output format classes. + */ + public static void setTasksClasses(JobConf jobConf, boolean setMapper, boolean setCombiner, boolean setReducer) { + if (setMapper) { + jobConf.setMapperClass(HadoopWordCount1Map.class); + jobConf.setInputFormat(TextInputFormat.class); + } + + if (setCombiner) + jobConf.setCombinerClass(HadoopWordCount1Reduce.class); + + if (setReducer) { + jobConf.setReducerClass(HadoopWordCount1Reduce.class); + jobConf.setOutputFormat(TextOutputFormat.class); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount1Map.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount1Map.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount1Map.java new file mode 100644 index 0000000..c10a7fb --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount1Map.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.examples; + +import org.apache.hadoop.io.*; +import org.apache.hadoop.mapred.*; + +import java.io.*; +import java.util.*; + +/** + * Mapper phase of WordCount job. + */ +public class HadoopWordCount1Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> { + /** Writable integer constant of '1' is writing as count of found words. */ + private static final IntWritable one = new IntWritable(1); + + /** Writable container for writing word. */ + private Text word = new Text(); + + /** Flag is to check that mapper was configured before run. */ + private boolean wasConfigured; + + /** {@inheritDoc} */ + @Override public void map(LongWritable key, Text val, OutputCollector<Text, IntWritable> output, Reporter reporter) + throws IOException { + + assert wasConfigured : "Mapper should be configured"; + + String line = val.toString(); + + StringTokenizer tokenizer = new StringTokenizer(line); + + while (tokenizer.hasMoreTokens()) { + word.set(tokenizer.nextToken()); + + output.collect(word, one); + } + } + + /** {@inheritDoc} */ + @Override public void configure(JobConf job) { + super.configure(job); + + wasConfigured = true; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount1Reduce.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount1Reduce.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount1Reduce.java new file mode 100644 index 0000000..76cd1c3 --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount1Reduce.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.examples; + +import org.apache.hadoop.io.*; +import org.apache.hadoop.mapred.*; + +import java.io.*; +import java.util.*; + +/** + * Combiner and Reducer phase of WordCount job. + */ +public class HadoopWordCount1Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { + /** Flag is to check that mapper was configured before run. */ + private boolean wasConfigured; + + /** {@inheritDoc} */ + @Override public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) + throws IOException { + assert wasConfigured : "Reducer should be configured"; + + int sum = 0; + + while (values.hasNext()) + sum += values.next().get(); + + output.collect(key, new IntWritable(sum)); + } + + @Override public void configure(JobConf job) { + super.configure(job); + + wasConfigured = true; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2.java new file mode 100644 index 0000000..dc68df7 --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.examples; + +import org.apache.hadoop.fs.*; +import org.apache.hadoop.io.*; +import org.apache.hadoop.mapreduce.*; +import org.apache.hadoop.mapreduce.lib.input.*; +import org.apache.hadoop.mapreduce.lib.output.*; + +import java.io.*; + +/** + * Example job for testing hadoop task execution. + */ +public class HadoopWordCount2 { + /** + * Entry point to start job. + * + * @param args Command line parameters. + * @throws Exception If fails. + */ + public static void main(String[] args) throws Exception { + if (args.length != 2) { + System.out.println("usage: [input] [output]"); + System.exit(-1); + } + + Job job = getJob(args[0], args[1]); + + job.submit(); + } + + /** + * Gets fully configured Job instance. + * + * @param input Input file name. + * @param output Output directory name. + * @return Job instance. + * @throws IOException If fails. + */ + public static Job getJob(String input, String output) throws IOException { + Job job = Job.getInstance(); + + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(IntWritable.class); + + setTasksClasses(job, true, true, true); + + FileInputFormat.setInputPaths(job, new Path(input)); + FileOutputFormat.setOutputPath(job, new Path(output)); + + job.setJarByClass(HadoopWordCount2.class); + + return job; + } + + /** + * Sets task classes with related info if needed into configuration object. + * + * @param job Configuration to change. + * @param setMapper Option to set mapper and input format classes. + * @param setCombiner Option to set combiner class. + * @param setReducer Option to set reducer and output format classes. + */ + public static void setTasksClasses(Job job, boolean setMapper, boolean setCombiner, boolean setReducer) { + if (setMapper) { + job.setMapperClass(HadoopWordCount2Mapper.class); + job.setInputFormatClass(TextInputFormat.class); + } + + if (setCombiner) + job.setCombinerClass(HadoopWordCount2Reducer.class); + + if (setReducer) { + job.setReducerClass(HadoopWordCount2Reducer.class); + job.setOutputFormatClass(TextOutputFormat.class); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2Mapper.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2Mapper.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2Mapper.java new file mode 100644 index 0000000..6ca7ccd --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2Mapper.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.examples; + +import org.apache.hadoop.conf.*; +import org.apache.hadoop.io.*; +import org.apache.hadoop.mapreduce.*; + +import java.io.*; +import java.util.*; + +/** + * Mapper phase of WordCount job. + */ +public class HadoopWordCount2Mapper extends Mapper<Object, Text, Text, IntWritable> implements Configurable { + /** Writable container for writing word. */ + private Text word = new Text(); + + /** Writable integer constant of '1' is writing as count of found words. */ + private static final IntWritable one = new IntWritable(1); + + /** Flag is to check that mapper was configured before run. */ + private boolean wasConfigured; + + /** Flag is to check that mapper was set up before run. */ + private boolean wasSetUp; + + /** {@inheritDoc} */ + @Override public void map(Object key, Text val, Context ctx) throws IOException, InterruptedException { + assert wasConfigured : "Mapper should be configured"; + assert wasSetUp : "Mapper should be set up"; + + StringTokenizer wordList = new StringTokenizer(val.toString()); + + while (wordList.hasMoreTokens()) { + word.set(wordList.nextToken()); + + ctx.write(word, one); + } + } + + /** {@inheritDoc} */ + @Override protected void setup(Context context) throws IOException, InterruptedException { + super.setup(context); + wasSetUp = true; + } + + /** {@inheritDoc} */ + @Override public void setConf(Configuration conf) { + wasConfigured = true; + } + + /** {@inheritDoc} */ + @Override public Configuration getConf() { + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2Reducer.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2Reducer.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2Reducer.java new file mode 100644 index 0000000..fedaaf9 --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2Reducer.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.examples; + +import org.apache.hadoop.conf.*; +import org.apache.hadoop.io.*; +import org.apache.hadoop.mapreduce.*; + +import java.io.*; + +/** + * Combiner and Reducer phase of WordCount job. + */ +public class HadoopWordCount2Reducer extends Reducer<Text, IntWritable, Text, IntWritable> implements Configurable { + /** Writable container for writing sum of word counts. */ + private IntWritable totalWordCnt = new IntWritable(); + + /** Flag is to check that mapper was configured before run. */ + private boolean wasConfigured; + + /** Flag is to check that mapper was set up before run. */ + private boolean wasSetUp; + + /** {@inheritDoc} */ + @Override public void reduce(Text key, Iterable<IntWritable> values, Context ctx) throws IOException, InterruptedException { + assert wasConfigured : "Reducer should be configured"; + assert wasSetUp : "Reducer should be set up"; + + int wordCnt = 0; + + for (IntWritable value : values) + wordCnt += value.get(); + + totalWordCnt.set(wordCnt); + + ctx.write(key, totalWordCnt); + } + + /** {@inheritDoc} */ + @Override protected void setup(Context context) throws IOException, InterruptedException { + super.setup(context); + wasSetUp = true; + } + + /** {@inheritDoc} */ + @Override public void setConf(Configuration conf) { + wasConfigured = true; + } + + /** {@inheritDoc} */ + @Override public Configuration getConf() { + return null; + } + +}