http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopPopularWordsTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopPopularWordsTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopPopularWordsTest.java new file mode 100644 index 0000000..a2f2ac3 --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopPopularWordsTest.java @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import com.google.common.collect.*; +import org.apache.hadoop.conf.*; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.*; +import org.apache.hadoop.io.*; +import org.apache.hadoop.mapreduce.*; +import org.apache.hadoop.mapreduce.lib.input.*; +import org.apache.hadoop.mapreduce.lib.output.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.io.*; +import java.util.*; +import java.util.Map.*; + +import static com.google.common.collect.Maps.*; +import static com.google.common.collect.MinMaxPriorityQueue.*; +import static java.util.Collections.*; + +/** + * Hadoop-based 10 popular words example: all files in a given directory are tokenized and for each word longer than + * 3 characters the number of occurrences ins calculated. Finally, 10 words with the highest occurrence count are + * output. + * + * NOTE: in order to run this example on Windows please ensure that cygwin is installed and available in the system + * path. + */ +public class HadoopPopularWordsTest { + /** Ignite home. */ + private static final String IGNITE_HOME = U.getIgniteHome(); + + /** The path to the input directory. ALl files in that directory will be processed. */ + private static final Path BOOKS_LOCAL_DIR = + new Path("file:" + IGNITE_HOME, "modules/tests/java/org/apache/ignite/grid/hadoop/books"); + + /** The path to the output directory. THe result file will be written to this location. */ + private static final Path RESULT_LOCAL_DIR = + new Path("file:" + IGNITE_HOME, "modules/tests/java/org/apache/ignite/grid/hadoop/output"); + + /** Popular books source dir in DFS. */ + private static final Path BOOKS_DFS_DIR = new Path("tmp/word-count-example/in"); + + /** Popular books source dir in DFS. */ + private static final Path RESULT_DFS_DIR = new Path("tmp/word-count-example/out"); + + /** Path to the distributed file system configuration. */ + private static final String DFS_CFG = "examples/config/filesystem/core-site.xml"; + + /** Top N words to select **/ + private static final int POPULAR_WORDS_CNT = 10; + + /** + * For each token in the input string the mapper emits a {word, 1} pair. + */ + private static class TokenizingMapper extends Mapper<LongWritable, Text, Text, IntWritable> { + /** Constant value. */ + private static final IntWritable ONE = new IntWritable(1); + + /** The word converted into the Text. */ + private Text word = new Text(); + + /** + * Emits a entry where the key is the word and the value is always 1. + * + * @param key the current position in the input file (not used here) + * @param val the text string + * @param ctx mapper context + * @throws IOException + * @throws InterruptedException + */ + @Override protected void map(LongWritable key, Text val, Context ctx) + throws IOException, InterruptedException { + // Get the mapped object. + final String line = val.toString(); + + // Splits the given string to words. + final String[] words = line.split("[^a-zA-Z0-9]"); + + for (final String w : words) { + // Only emit counts for longer words. + if (w.length() <= 3) + continue; + + word.set(w); + + // Write the word into the context with the initial count equals 1. + ctx.write(word, ONE); + } + } + } + + /** + * The reducer uses a priority queue to rank the words based on its number of occurrences. + */ + private static class TopNWordsReducer extends Reducer<Text, IntWritable, Text, IntWritable> { + private MinMaxPriorityQueue<Entry<Integer, String>> q; + + TopNWordsReducer() { + q = orderedBy(reverseOrder(new Comparator<Entry<Integer, String>>() { + @Override public int compare(Entry<Integer, String> o1, Entry<Integer, String> o2) { + return o1.getKey().compareTo(o2.getKey()); + } + })).expectedSize(POPULAR_WORDS_CNT).maximumSize(POPULAR_WORDS_CNT).create(); + } + + /** + * This method doesn't emit anything, but just keeps track of the top N words. + * + * @param key The word. + * @param vals The words counts. + * @param ctx Reducer context. + * @throws IOException If failed. + * @throws InterruptedException If failed. + */ + @Override public void reduce(Text key, Iterable<IntWritable> vals, Context ctx) throws IOException, + InterruptedException { + int sum = 0; + + for (IntWritable val : vals) + sum += val.get(); + + q.add(immutableEntry(sum, key.toString())); + } + + /** + * This method is called after all the word entries have been processed. It writes the accumulated + * statistics to the job output file. + * + * @param ctx The job context. + * @throws IOException If failed. + * @throws InterruptedException If failed. + */ + @Override protected void cleanup(Context ctx) throws IOException, InterruptedException { + IntWritable i = new IntWritable(); + + Text txt = new Text(); + + // iterate in desc order + while (!q.isEmpty()) { + Entry<Integer, String> e = q.removeFirst(); + + i.set(e.getKey()); + + txt.set(e.getValue()); + + ctx.write(txt, i); + } + } + } + + /** + * Configures the Hadoop MapReduce job. + * + * @return Instance of the Hadoop MapRed job. + * @throws IOException If failed. + */ + private Job createConfigBasedHadoopJob() throws IOException { + Job jobCfg = new Job(); + + Configuration cfg = jobCfg.getConfiguration(); + + // Use explicit configuration of distributed file system, if provided. + if (DFS_CFG != null) + cfg.addResource(U.resolveIgniteUrl(DFS_CFG)); + + jobCfg.setJobName("HadoopPopularWordExample"); + jobCfg.setJarByClass(HadoopPopularWordsTest.class); + jobCfg.setInputFormatClass(TextInputFormat.class); + jobCfg.setOutputKeyClass(Text.class); + jobCfg.setOutputValueClass(IntWritable.class); + jobCfg.setMapperClass(TokenizingMapper.class); + jobCfg.setReducerClass(TopNWordsReducer.class); + + FileInputFormat.setInputPaths(jobCfg, BOOKS_DFS_DIR); + FileOutputFormat.setOutputPath(jobCfg, RESULT_DFS_DIR); + + // Local job tracker allows the only task per wave, but text input format + // replaces it with the calculated value based on input split size option. + if ("local".equals(cfg.get("mapred.job.tracker", "local"))) { + // Split job into tasks using 32MB split size. + FileInputFormat.setMinInputSplitSize(jobCfg, 32 * 1024 * 1024); + FileInputFormat.setMaxInputSplitSize(jobCfg, Long.MAX_VALUE); + } + + return jobCfg; + } + + /** + * Runs the Hadoop job. + * + * @return {@code True} if succeeded, {@code false} otherwise. + * @throws Exception If failed. + */ + private boolean runWordCountConfigBasedHadoopJob() throws Exception { + Job job = createConfigBasedHadoopJob(); + + // Distributed file system this job will work with. + FileSystem fs = FileSystem.get(job.getConfiguration()); + + X.println(">>> Using distributed file system: " + fs.getHomeDirectory()); + + // Prepare input and output job directories. + prepareDirectories(fs); + + long time = System.currentTimeMillis(); + + // Run job. + boolean res = job.waitForCompletion(true); + + X.println(">>> Job execution time: " + (System.currentTimeMillis() - time) / 1000 + " sec."); + + // Move job results into local file system, so you can view calculated results. + publishResults(fs); + + return res; + } + + /** + * Prepare job's data: cleanup result directories that might have left over + * after previous runs, copy input files from the local file system into DFS. + * + * @param fs Distributed file system to use in job. + * @throws IOException If failed. + */ + private void prepareDirectories(FileSystem fs) throws IOException { + X.println(">>> Cleaning up DFS result directory: " + RESULT_DFS_DIR); + + fs.delete(RESULT_DFS_DIR, true); + + X.println(">>> Cleaning up DFS input directory: " + BOOKS_DFS_DIR); + + fs.delete(BOOKS_DFS_DIR, true); + + X.println(">>> Copy local files into DFS input directory: " + BOOKS_DFS_DIR); + + fs.copyFromLocalFile(BOOKS_LOCAL_DIR, BOOKS_DFS_DIR); + } + + /** + * Publish job execution results into local file system, so you can view them. + * + * @param fs Distributed file sytem used in job. + * @throws IOException If failed. + */ + private void publishResults(FileSystem fs) throws IOException { + X.println(">>> Cleaning up DFS input directory: " + BOOKS_DFS_DIR); + + fs.delete(BOOKS_DFS_DIR, true); + + X.println(">>> Cleaning up LOCAL result directory: " + RESULT_LOCAL_DIR); + + fs.delete(RESULT_LOCAL_DIR, true); + + X.println(">>> Moving job results into LOCAL result directory: " + RESULT_LOCAL_DIR); + + fs.copyToLocalFile(true, RESULT_DFS_DIR, RESULT_LOCAL_DIR); + } + + /** + * Executes a modified version of the Hadoop word count example. Here, in addition to counting the number of + * occurrences of the word in the source files, the N most popular words are selected. + * + * @param args None. + */ + public static void main(String[] args) { + try { + new HadoopPopularWordsTest().runWordCountConfigBasedHadoopJob(); + } + catch (Exception e) { + X.println(">>> Failed to run word count example: " + e.getMessage()); + } + + System.exit(0); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSerializationWrapperSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSerializationWrapperSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSerializationWrapperSelfTest.java new file mode 100644 index 0000000..5d5bb94 --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSerializationWrapperSelfTest.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.hadoop.io.*; +import org.apache.hadoop.io.serializer.*; +import org.apache.ignite.internal.processors.hadoop.v2.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.io.*; +import java.util.*; + +/** + * Test of wrapper of the native serialization. + */ +public class HadoopSerializationWrapperSelfTest extends GridCommonAbstractTest { + /** + * Tests read/write of IntWritable via native WritableSerialization. + * @throws Exception If fails. + */ + public void testIntWritableSerialization() throws Exception { + HadoopSerialization ser = new HadoopSerializationWrapper(new WritableSerialization(), IntWritable.class); + + ByteArrayOutputStream buf = new ByteArrayOutputStream(); + + DataOutput out = new DataOutputStream(buf); + + ser.write(out, new IntWritable(3)); + ser.write(out, new IntWritable(-5)); + + assertEquals("[0, 0, 0, 3, -1, -1, -1, -5]", Arrays.toString(buf.toByteArray())); + + DataInput in = new DataInputStream(new ByteArrayInputStream(buf.toByteArray())); + + assertEquals(3, ((IntWritable)ser.read(in, null)).get()); + assertEquals(-5, ((IntWritable)ser.read(in, null)).get()); + } + + /** + * Tests read/write of Integer via native JavaleSerialization. + * @throws Exception If fails. + */ + public void testIntJavaSerialization() throws Exception { + HadoopSerialization ser = new HadoopSerializationWrapper(new JavaSerialization(), Integer.class); + + ByteArrayOutputStream buf = new ByteArrayOutputStream(); + + DataOutput out = new DataOutputStream(buf); + + ser.write(out, 3); + ser.write(out, -5); + ser.close(); + + DataInput in = new DataInputStream(new ByteArrayInputStream(buf.toByteArray())); + + assertEquals(3, ((Integer)ser.read(in, null)).intValue()); + assertEquals(-5, ((Integer)ser.read(in, null)).intValue()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSharedMap.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSharedMap.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSharedMap.java new file mode 100644 index 0000000..c73ee9f --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSharedMap.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.jdk8.backport.*; + +import java.util.concurrent.*; + +/** + * For tests. + */ +public class HadoopSharedMap { + /** */ + private static final ConcurrentMap<String, HadoopSharedMap> maps = new ConcurrentHashMap8<>(); + + /** */ + private final ConcurrentMap<String, Object> map = new ConcurrentHashMap8<>(); + + /** + * Private. + */ + private HadoopSharedMap() { + // No-op. + } + + /** + * Puts object by key. + * + * @param key Key. + * @param val Value. + */ + public <T> T put(String key, T val) { + Object old = map.putIfAbsent(key, val); + + return old == null ? val : (T)old; + } + + /** + * @param cls Class. + * @return Map of static fields. + */ + public static HadoopSharedMap map(Class<?> cls) { + HadoopSharedMap m = maps.get(cls.getName()); + + if (m != null) + return m; + + HadoopSharedMap old = maps.putIfAbsent(cls.getName(), m = new HadoopSharedMap()); + + return old == null ? m : old; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSortingExternalTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSortingExternalTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSortingExternalTest.java new file mode 100644 index 0000000..772e77d --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSortingExternalTest.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.ignite.configuration.*; + +/** + * External test for sorting. + */ +public class HadoopSortingExternalTest extends HadoopSortingTest { + /** {@inheritDoc} */ + @Override public HadoopConfiguration hadoopConfiguration(String gridName) { + HadoopConfiguration cfg = super.hadoopConfiguration(gridName); + + cfg.setExternalExecution(true); + + return cfg; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSortingTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSortingTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSortingTest.java new file mode 100644 index 0000000..3f6594a --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSortingTest.java @@ -0,0 +1,282 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.hadoop.fs.*; +import org.apache.hadoop.io.*; +import org.apache.hadoop.io.serializer.*; +import org.apache.hadoop.mapreduce.*; +import org.apache.hadoop.mapreduce.lib.input.*; +import org.apache.hadoop.mapreduce.lib.output.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.util.typedef.*; + +import java.io.*; +import java.net.*; +import java.util.*; + +import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*; + +/** + * Tests correct sorting. + */ +public class HadoopSortingTest extends HadoopAbstractSelfTest { + /** */ + private static final String PATH_INPUT = "/test-in"; + + /** */ + private static final String PATH_OUTPUT = "/test-out"; + + /** {@inheritDoc} */ + @Override protected int gridCount() { + return 3; + } + + /** + * @return {@code True} if IGFS is enabled on Hadoop nodes. + */ + @Override protected boolean igfsEnabled() { + return true; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + startGrids(gridCount()); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(true); + } + + /** {@inheritDoc} */ + @Override public HadoopConfiguration hadoopConfiguration(String gridName) { + HadoopConfiguration cfg = super.hadoopConfiguration(gridName); + + cfg.setExternalExecution(false); + + return cfg; + } + + /** + * @throws Exception If failed. + */ + public void testSortSimple() throws Exception { + // Generate test data. + Job job = Job.getInstance(); + + job.setInputFormatClass(InFormat.class); + + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(NullWritable.class); + + job.setMapperClass(Mapper.class); + job.setNumReduceTasks(0); + + setupFileSystems(job.getConfiguration()); + + FileOutputFormat.setOutputPath(job, new Path(igfsScheme() + PATH_INPUT)); + + X.printerrln("Data generation started."); + + grid(0).hadoop().submit(new HadoopJobId(UUID.randomUUID(), 1), + createJobInfo(job.getConfiguration())).get(180000); + + X.printerrln("Data generation complete."); + + // Run main map-reduce job. + job = Job.getInstance(); + + setupFileSystems(job.getConfiguration()); + + job.getConfiguration().set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, JavaSerialization.class.getName() + + "," + WritableSerialization.class.getName()); + + FileInputFormat.setInputPaths(job, new Path(igfsScheme() + PATH_INPUT)); + FileOutputFormat.setOutputPath(job, new Path(igfsScheme() + PATH_OUTPUT)); + + job.setSortComparatorClass(JavaSerializationComparator.class); + + job.setMapperClass(MyMapper.class); + job.setReducerClass(MyReducer.class); + + job.setNumReduceTasks(2); + + job.setMapOutputKeyClass(UUID.class); + job.setMapOutputValueClass(NullWritable.class); + + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(NullWritable.class); + + X.printerrln("Job started."); + + grid(0).hadoop().submit(new HadoopJobId(UUID.randomUUID(), 2), + createJobInfo(job.getConfiguration())).get(180000); + + X.printerrln("Job complete."); + + // Check result. + Path outDir = new Path(igfsScheme() + PATH_OUTPUT); + + AbstractFileSystem fs = AbstractFileSystem.get(new URI(igfsScheme()), job.getConfiguration()); + + for (FileStatus file : fs.listStatus(outDir)) { + X.printerrln("__ file: " + file); + + if (file.getLen() == 0) + continue; + + FSDataInputStream in = fs.open(file.getPath()); + + Scanner sc = new Scanner(in); + + UUID prev = null; + + while(sc.hasNextLine()) { + UUID next = UUID.fromString(sc.nextLine()); + +// X.printerrln("___ check: " + next); + + if (prev != null) + assertTrue(prev.compareTo(next) < 0); + + prev = next; + } + } + } + + public static class InFormat extends InputFormat<Text, NullWritable> { + /** {@inheritDoc} */ + @Override public List<InputSplit> getSplits(JobContext ctx) throws IOException, InterruptedException { + List<InputSplit> res = new ArrayList<>(); + + FakeSplit split = new FakeSplit(20); + + for (int i = 0; i < 10; i++) + res.add(split); + + return res; + } + + /** {@inheritDoc} */ + @Override public RecordReader<Text, NullWritable> createRecordReader(final InputSplit split, + TaskAttemptContext ctx) throws IOException, InterruptedException { + return new RecordReader<Text, NullWritable>() { + /** */ + int cnt; + + /** */ + Text txt = new Text(); + + @Override public void initialize(InputSplit split, TaskAttemptContext ctx) { + // No-op. + } + + @Override public boolean nextKeyValue() throws IOException, InterruptedException { + return ++cnt <= split.getLength(); + } + + @Override public Text getCurrentKey() { + txt.set(UUID.randomUUID().toString()); + +// X.printerrln("___ read: " + txt); + + return txt; + } + + @Override public NullWritable getCurrentValue() { + return NullWritable.get(); + } + + @Override public float getProgress() throws IOException, InterruptedException { + return (float)cnt / split.getLength(); + } + + @Override public void close() { + // No-op. + } + }; + } + } + + public static class MyMapper extends Mapper<LongWritable, Text, UUID, NullWritable> { + /** {@inheritDoc} */ + @Override protected void map(LongWritable key, Text val, Context ctx) throws IOException, InterruptedException { +// X.printerrln("___ map: " + val); + + ctx.write(UUID.fromString(val.toString()), NullWritable.get()); + } + } + + public static class MyReducer extends Reducer<UUID, NullWritable, Text, NullWritable> { + /** */ + private Text text = new Text(); + + /** {@inheritDoc} */ + @Override protected void reduce(UUID key, Iterable<NullWritable> vals, Context ctx) + throws IOException, InterruptedException { +// X.printerrln("___ rdc: " + key); + + text.set(key.toString()); + + ctx.write(text, NullWritable.get()); + } + } + + public static class FakeSplit extends InputSplit implements Writable { + /** */ + private static final String[] HOSTS = {"127.0.0.1"}; + + /** */ + private int len; + + /** + * @param len Length. + */ + public FakeSplit(int len) { + this.len = len; + } + + /** + * + */ + public FakeSplit() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public long getLength() throws IOException, InterruptedException { + return len; + } + + /** {@inheritDoc} */ + @Override public String[] getLocations() throws IOException, InterruptedException { + return HOSTS; + } + + /** {@inheritDoc} */ + @Override public void write(DataOutput out) throws IOException { + out.writeInt(len); + } + + /** {@inheritDoc} */ + @Override public void readFields(DataInput in) throws IOException { + len = in.readInt(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSplitWrapperSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSplitWrapperSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSplitWrapperSelfTest.java new file mode 100644 index 0000000..ee490be --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSplitWrapperSelfTest.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.hadoop.fs.*; +import org.apache.hadoop.mapreduce.lib.input.*; +import org.apache.ignite.internal.processors.hadoop.v2.*; +import org.apache.ignite.testframework.*; + +import java.io.*; +import java.util.*; +import java.util.concurrent.*; + +/** + * Self test of {@link org.apache.ignite.internal.processors.hadoop.v2.HadoopSplitWrapper}. + */ +public class HadoopSplitWrapperSelfTest extends HadoopAbstractSelfTest { + /** + * Tests serialization of wrapper and the wrapped native split. + * @throws Exception If fails. + */ + public void testSerialization() throws Exception { + FileSplit nativeSplit = new FileSplit(new Path("/path/to/file"), 100, 500, new String[]{"host1", "host2"}); + + assertEquals("/path/to/file:100+500", nativeSplit.toString()); + + HadoopSplitWrapper split = HadoopUtils.wrapSplit(10, nativeSplit, nativeSplit.getLocations()); + + assertEquals("[host1, host2]", Arrays.toString(split.hosts())); + + ByteArrayOutputStream buf = new ByteArrayOutputStream(); + + ObjectOutput out = new ObjectOutputStream(buf); + + out.writeObject(split); + + ObjectInput in = new ObjectInputStream(new ByteArrayInputStream(buf.toByteArray())); + + final HadoopSplitWrapper res = (HadoopSplitWrapper)in.readObject(); + + assertEquals("/path/to/file:100+500", HadoopUtils.unwrapSplit(res).toString()); + + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + res.hosts(); + + return null; + } + }, AssertionError.class, null); + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopStartup.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopStartup.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopStartup.java new file mode 100644 index 0000000..1a93223 --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopStartup.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.hadoop.conf.*; +import org.apache.ignite.hadoop.fs.v2.IgniteHadoopFileSystem; +import org.apache.ignite.internal.util.typedef.*; + +/** + * Hadoop node startup. + */ +public class HadoopStartup { + /** + * @param args Arguments. + */ + public static void main(String[] args) { + G.start("config/hadoop/default-config.xml"); + } + + /** + * @return Configuration for job run. + */ + @SuppressWarnings("UnnecessaryFullyQualifiedName") + public static Configuration configuration() { + Configuration cfg = new Configuration(); + + cfg.set("fs.defaultFS", "igfs://igfs@localhost"); + + cfg.set("fs.igfs.impl", org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem.class.getName()); + cfg.set("fs.AbstractFileSystem.igfs.impl", IgniteHadoopFileSystem.class.getName()); + + cfg.set("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER"); + + cfg.set("mapreduce.framework.name", "ignite"); + cfg.set("mapreduce.jobtracker.address", "localhost:11211"); + + return cfg; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskExecutionSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskExecutionSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskExecutionSelfTest.java new file mode 100644 index 0000000..20c5db2 --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskExecutionSelfTest.java @@ -0,0 +1,551 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.hadoop.conf.*; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.*; +import org.apache.hadoop.io.*; +import org.apache.hadoop.mapreduce.*; +import org.apache.hadoop.mapreduce.lib.input.*; +import org.apache.hadoop.mapreduce.lib.output.*; +import org.apache.ignite.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.hadoop.fs.v1.*; +import org.apache.ignite.igfs.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.util.lang.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.testframework.*; + +import java.io.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*; + +/** + * Tests map-reduce task execution basics. + */ +public class HadoopTaskExecutionSelfTest extends HadoopAbstractSelfTest { + /** */ + private static HadoopSharedMap m = HadoopSharedMap.map(HadoopTaskExecutionSelfTest.class); + + /** Line count. */ + private static final AtomicInteger totalLineCnt = m.put("totalLineCnt", new AtomicInteger()); + + /** Executed tasks. */ + private static final AtomicInteger executedTasks = m.put("executedTasks", new AtomicInteger()); + + /** Cancelled tasks. */ + private static final AtomicInteger cancelledTasks = m.put("cancelledTasks", new AtomicInteger()); + + /** Working directory of each task. */ + private static final Map<String, String> taskWorkDirs = m.put("taskWorkDirs", + new ConcurrentHashMap<String, String>()); + + /** Mapper id to fail. */ + private static final AtomicInteger failMapperId = m.put("failMapperId", new AtomicInteger()); + + /** Number of splits of the current input. */ + private static final AtomicInteger splitsCount = m.put("splitsCount", new AtomicInteger()); + + /** Test param. */ + private static final String MAP_WRITE = "test.map.write"; + + + /** {@inheritDoc} */ + @Override public FileSystemConfiguration igfsConfiguration() { + FileSystemConfiguration cfg = super.igfsConfiguration(); + + cfg.setFragmentizerEnabled(false); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected boolean igfsEnabled() { + return true; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + startGrids(gridCount()); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + + super.afterTestsStopped(); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + grid(0).fileSystem(igfsName).format(); + } + + /** {@inheritDoc} */ + @Override public HadoopConfiguration hadoopConfiguration(String gridName) { + HadoopConfiguration cfg = super.hadoopConfiguration(gridName); + + cfg.setMaxParallelTasks(5); + cfg.setExternalExecution(false); + + return cfg; + } + + /** + * @throws Exception If failed. + */ + public void testMapRun() throws Exception { + int lineCnt = 10000; + String fileName = "/testFile"; + + prepareFile(fileName, lineCnt); + + totalLineCnt.set(0); + taskWorkDirs.clear(); + + Configuration cfg = new Configuration(); + + cfg.setStrings("fs.igfs.impl", IgniteHadoopFileSystem.class.getName()); + + Job job = Job.getInstance(cfg); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(IntWritable.class); + + job.setMapperClass(TestMapper.class); + + job.setNumReduceTasks(0); + + job.setInputFormatClass(TextInputFormat.class); + + FileInputFormat.setInputPaths(job, new Path("igfs://:" + getTestGridName(0) + "@/")); + FileOutputFormat.setOutputPath(job, new Path("igfs://:" + getTestGridName(0) + "@/output/")); + + job.setJarByClass(getClass()); + + IgniteInternalFuture<?> fut = grid(0).hadoop().submit(new HadoopJobId(UUID.randomUUID(), 1), + createJobInfo(job.getConfiguration())); + + fut.get(); + + assertEquals(lineCnt, totalLineCnt.get()); + + assertEquals(32, taskWorkDirs.size()); + } + + /** + * @throws Exception If failed. + */ + public void testMapCombineRun() throws Exception { + int lineCnt = 10001; + String fileName = "/testFile"; + + prepareFile(fileName, lineCnt); + + totalLineCnt.set(0); + taskWorkDirs.clear(); + + Configuration cfg = new Configuration(); + + cfg.setStrings("fs.igfs.impl", IgniteHadoopFileSystem.class.getName()); + cfg.setBoolean(MAP_WRITE, true); + + Job job = Job.getInstance(cfg); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(IntWritable.class); + + job.setMapperClass(TestMapper.class); + job.setCombinerClass(TestCombiner.class); + job.setReducerClass(TestReducer.class); + + job.setNumReduceTasks(2); + + job.setInputFormatClass(TextInputFormat.class); + + FileInputFormat.setInputPaths(job, new Path("igfs://:" + getTestGridName(0) + "@/")); + FileOutputFormat.setOutputPath(job, new Path("igfs://:" + getTestGridName(0) + "@/output")); + + job.setJarByClass(getClass()); + + HadoopJobId jobId = new HadoopJobId(UUID.randomUUID(), 2); + + IgniteInternalFuture<?> fut = grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration())); + + fut.get(); + + assertEquals(lineCnt, totalLineCnt.get()); + + assertEquals(34, taskWorkDirs.size()); + + for (int g = 0; g < gridCount(); g++) + grid(g).hadoop().finishFuture(jobId).get(); + } + + /** + * @throws Exception If failed. + */ + public void testMapperException() throws Exception { + prepareFile("/testFile", 1000); + + Configuration cfg = new Configuration(); + + cfg.setStrings("fs.igfs.impl", IgniteHadoopFileSystem.class.getName()); + + Job job = Job.getInstance(cfg); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(IntWritable.class); + + job.setMapperClass(FailMapper.class); + + job.setNumReduceTasks(0); + + job.setInputFormatClass(TextInputFormat.class); + + FileInputFormat.setInputPaths(job, new Path("igfs://:" + getTestGridName(0) + "@/")); + FileOutputFormat.setOutputPath(job, new Path("igfs://:" + getTestGridName(0) + "@/output/")); + + job.setJarByClass(getClass()); + + final IgniteInternalFuture<?> fut = grid(0).hadoop().submit(new HadoopJobId(UUID.randomUUID(), 3), + createJobInfo(job.getConfiguration())); + + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + fut.get(); + + return null; + } + }, IgniteCheckedException.class, null); + } + + /** + * @param fileName File name. + * @param lineCnt Line count. + * @throws Exception If failed. + */ + private void prepareFile(String fileName, int lineCnt) throws Exception { + IgniteFileSystem igfs = grid(0).fileSystem(igfsName); + + try (OutputStream os = igfs.create(new IgfsPath(fileName), true)) { + PrintWriter w = new PrintWriter(new OutputStreamWriter(os)); + + for (int i = 0; i < lineCnt; i++) + w.print("Hello, Hadoop map-reduce!\n"); + + w.flush(); + } + } + + /** + * Prepare job with mappers to cancel. + * @return Fully configured job. + * @throws Exception If fails. + */ + private Configuration prepareJobForCancelling() throws Exception { + prepareFile("/testFile", 1500); + + executedTasks.set(0); + cancelledTasks.set(0); + failMapperId.set(0); + splitsCount.set(0); + + Configuration cfg = new Configuration(); + + setupFileSystems(cfg); + + Job job = Job.getInstance(cfg); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(IntWritable.class); + + job.setMapperClass(CancellingTestMapper.class); + + job.setNumReduceTasks(0); + + job.setInputFormatClass(InFormat.class); + + FileInputFormat.setInputPaths(job, new Path("igfs://:" + getTestGridName(0) + "@/")); + FileOutputFormat.setOutputPath(job, new Path("igfs://:" + getTestGridName(0) + "@/output/")); + + job.setJarByClass(getClass()); + + return job.getConfiguration(); + } + + /** + * Test input format. + */ + private static class InFormat extends TextInputFormat { + @Override public List<InputSplit> getSplits(JobContext ctx) throws IOException { + List<InputSplit> res = super.getSplits(ctx); + + splitsCount.set(res.size()); + + X.println("___ split of input: " + splitsCount.get()); + + return res; + } + } + + /** + * @throws Exception If failed. + */ + public void testTaskCancelling() throws Exception { + Configuration cfg = prepareJobForCancelling(); + + HadoopJobId jobId = new HadoopJobId(UUID.randomUUID(), 1); + + final IgniteInternalFuture<?> fut = grid(0).hadoop().submit(jobId, createJobInfo(cfg)); + + if (!GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return splitsCount.get() > 0; + } + }, 20000)) { + U.dumpThreads(log); + + assertTrue(false); + } + + if (!GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return executedTasks.get() == splitsCount.get(); + } + }, 20000)) { + U.dumpThreads(log); + + assertTrue(false); + } + + // Fail mapper with id "1", cancels others + failMapperId.set(1); + + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + fut.get(); + + return null; + } + }, IgniteCheckedException.class, null); + + assertEquals(executedTasks.get(), cancelledTasks.get() + 1); + } + + /** + * @throws Exception If failed. + */ + public void testJobKill() throws Exception { + Configuration cfg = prepareJobForCancelling(); + + Hadoop hadoop = grid(0).hadoop(); + + HadoopJobId jobId = new HadoopJobId(UUID.randomUUID(), 1); + + //Kill unknown job. + boolean killRes = hadoop.kill(jobId); + + assertFalse(killRes); + + final IgniteInternalFuture<?> fut = hadoop.submit(jobId, createJobInfo(cfg)); + + if (!GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return splitsCount.get() > 0; + } + }, 20000)) { + U.dumpThreads(log); + + assertTrue(false); + } + + if (!GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + X.println("___ executed tasks: " + executedTasks.get()); + + return executedTasks.get() == splitsCount.get(); + } + }, 20000)) { + U.dumpThreads(log); + + fail(); + } + + //Kill really ran job. + killRes = hadoop.kill(jobId); + + assertTrue(killRes); + + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + fut.get(); + + return null; + } + }, IgniteCheckedException.class, null); + + assertEquals(executedTasks.get(), cancelledTasks.get()); + + //Kill the same job again. + killRes = hadoop.kill(jobId); + + assertTrue(killRes); + } + + private static class CancellingTestMapper extends Mapper<Object, Text, Text, IntWritable> { + private int mapperId; + + /** {@inheritDoc} */ + @Override protected void setup(Context ctx) throws IOException, InterruptedException { + mapperId = executedTasks.incrementAndGet(); + } + + /** {@inheritDoc} */ + @Override public void run(Context ctx) throws IOException, InterruptedException { + try { + super.run(ctx); + } + catch (HadoopTaskCancelledException e) { + cancelledTasks.incrementAndGet(); + + throw e; + } + } + + /** {@inheritDoc} */ + @Override protected void map(Object key, Text val, Context ctx) throws IOException, InterruptedException { + if (mapperId == failMapperId.get()) + throw new IOException(); + + Thread.sleep(1000); + } + } + + /** + * Test failing mapper. + */ + private static class FailMapper extends Mapper<Object, Text, Text, IntWritable> { + /** {@inheritDoc} */ + @Override protected void map(Object key, Text val, Context ctx) throws IOException, InterruptedException { + throw new IOException("Expected"); + } + } + + /** + * Mapper calculates number of lines. + */ + private static class TestMapper extends Mapper<Object, Text, Text, IntWritable> { + /** Writable integer constant of '1'. */ + private static final IntWritable ONE = new IntWritable(1); + + /** Line count constant. */ + public static final Text LINE_COUNT = new Text("lineCount"); + + /** {@inheritDoc} */ + @Override protected void setup(Context ctx) throws IOException, InterruptedException { + X.println("___ Mapper: " + ctx.getTaskAttemptID()); + + String taskId = ctx.getTaskAttemptID().toString(); + + LocalFileSystem locFs = FileSystem.getLocal(ctx.getConfiguration()); + + String workDir = locFs.getWorkingDirectory().toString(); + + assertNull(taskWorkDirs.put(workDir, taskId)); + } + + /** {@inheritDoc} */ + @Override protected void map(Object key, Text val, Context ctx) throws IOException, InterruptedException { + if (ctx.getConfiguration().getBoolean(MAP_WRITE, false)) + ctx.write(LINE_COUNT, ONE); + else + totalLineCnt.incrementAndGet(); + } + } + + /** + * Combiner calculates number of lines. + */ + private static class TestCombiner extends Reducer<Text, IntWritable, Text, IntWritable> { + /** */ + IntWritable sum = new IntWritable(); + + /** {@inheritDoc} */ + @Override protected void setup(Context ctx) throws IOException, InterruptedException { + X.println("___ Combiner: "); + } + + /** {@inheritDoc} */ + @Override protected void reduce(Text key, Iterable<IntWritable> values, Context ctx) throws IOException, + InterruptedException { + int lineCnt = 0; + + for (IntWritable value : values) + lineCnt += value.get(); + + sum.set(lineCnt); + + X.println("___ combo: " + lineCnt); + + ctx.write(key, sum); + } + } + + /** + * Combiner calculates number of lines. + */ + private static class TestReducer extends Reducer<Text, IntWritable, Text, IntWritable> { + /** */ + IntWritable sum = new IntWritable(); + + /** {@inheritDoc} */ + @Override protected void setup(Context ctx) throws IOException, InterruptedException { + X.println("___ Reducer: " + ctx.getTaskAttemptID()); + + String taskId = ctx.getTaskAttemptID().toString(); + String workDir = FileSystem.getLocal(ctx.getConfiguration()).getWorkingDirectory().toString(); + + assertNull(taskWorkDirs.put(workDir, taskId)); + } + + /** {@inheritDoc} */ + @Override protected void reduce(Text key, Iterable<IntWritable> values, Context ctx) throws IOException, + InterruptedException { + int lineCnt = 0; + + for (IntWritable value : values) { + lineCnt += value.get(); + + X.println("___ rdcr: " + value.get()); + } + + sum.set(lineCnt); + + ctx.write(key, sum); + + X.println("___ RDCR SUM: " + lineCnt); + + totalLineCnt.addAndGet(lineCnt); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksAllVersionsTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksAllVersionsTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksAllVersionsTest.java new file mode 100644 index 0000000..aaf0f92 --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksAllVersionsTest.java @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import com.google.common.base.*; +import org.apache.hadoop.io.*; +import org.apache.ignite.*; +import org.apache.ignite.igfs.*; +import org.apache.ignite.internal.processors.hadoop.examples.*; +import org.apache.ignite.internal.processors.hadoop.v2.*; + +import java.io.*; +import java.net.*; +import java.util.*; + +/** + * Tests of Map, Combine and Reduce task executions of any version of hadoop API. + */ +abstract class HadoopTasksAllVersionsTest extends HadoopAbstractWordCountTest { + /** Empty hosts array. */ + private static final String[] HOSTS = new String[0]; + + /** + * Creates some grid hadoop job. Override this method to create tests for any job implementation. + * + * @param inFile Input file name for the job. + * @param outFile Output file name for the job. + * @return Hadoop job. + * @throws IOException If fails. + */ + public abstract HadoopV2Job getHadoopJob(String inFile, String outFile) throws Exception; + + /** + * @return prefix of reducer output file name. It's "part-" for v1 and "part-r-" for v2 API + */ + public abstract String getOutputFileNamePrefix(); + + /** + * Tests map task execution. + * + * @throws Exception If fails. + */ + @SuppressWarnings("ConstantConditions") + public void testMapTask() throws Exception { + IgfsPath inDir = new IgfsPath(PATH_INPUT); + + igfs.mkdirs(inDir); + + IgfsPath inFile = new IgfsPath(inDir, HadoopWordCount2.class.getSimpleName() + "-input"); + + URI inFileUri = URI.create(igfsScheme() + inFile.toString()); + + try (PrintWriter pw = new PrintWriter(igfs.create(inFile, true))) { + pw.println("hello0 world0"); + pw.println("world1 hello1"); + } + + HadoopFileBlock fileBlock1 = new HadoopFileBlock(HOSTS, inFileUri, 0, igfs.info(inFile).length() - 1); + + try (PrintWriter pw = new PrintWriter(igfs.append(inFile, false))) { + pw.println("hello2 world2"); + pw.println("world3 hello3"); + } + HadoopFileBlock fileBlock2 = new HadoopFileBlock(HOSTS, inFileUri, fileBlock1.length(), + igfs.info(inFile).length() - fileBlock1.length()); + + HadoopV2Job gridJob = getHadoopJob(igfsScheme() + inFile.toString(), igfsScheme() + PATH_OUTPUT); + + HadoopTaskInfo taskInfo = new HadoopTaskInfo(HadoopTaskType.MAP, gridJob.id(), 0, 0, fileBlock1); + + HadoopTestTaskContext ctx = new HadoopTestTaskContext(taskInfo, gridJob); + + ctx.mockOutput().clear(); + + ctx.run(); + + assertEquals("hello0,1; world0,1; world1,1; hello1,1", Joiner.on("; ").join(ctx.mockOutput())); + + ctx.mockOutput().clear(); + + ctx.taskInfo(new HadoopTaskInfo(HadoopTaskType.MAP, gridJob.id(), 0, 0, fileBlock2)); + + ctx.run(); + + assertEquals("hello2,1; world2,1; world3,1; hello3,1", Joiner.on("; ").join(ctx.mockOutput())); + } + + /** + * Generates input data for reduce-like operation into mock context input and runs the operation. + * + * @param gridJob Job is to create reduce task from. + * @param taskType Type of task - combine or reduce. + * @param taskNum Number of task in job. + * @param words Pairs of words and its counts. + * @return Context with mock output. + * @throws IgniteCheckedException If fails. + */ + private HadoopTestTaskContext runTaskWithInput(HadoopV2Job gridJob, HadoopTaskType taskType, + int taskNum, String... words) throws IgniteCheckedException { + HadoopTaskInfo taskInfo = new HadoopTaskInfo(taskType, gridJob.id(), taskNum, 0, null); + + HadoopTestTaskContext ctx = new HadoopTestTaskContext(taskInfo, gridJob); + + for (int i = 0; i < words.length; i+=2) { + List<IntWritable> valList = new ArrayList<>(); + + for (int j = 0; j < Integer.parseInt(words[i + 1]); j++) + valList.add(new IntWritable(1)); + + ctx.mockInput().put(new Text(words[i]), valList); + } + + ctx.run(); + + return ctx; + } + + /** + * Tests reduce task execution. + * + * @throws Exception If fails. + */ + public void testReduceTask() throws Exception { + HadoopV2Job gridJob = getHadoopJob(igfsScheme() + PATH_INPUT, igfsScheme() + PATH_OUTPUT); + + runTaskWithInput(gridJob, HadoopTaskType.REDUCE, 0, "word1", "5", "word2", "10"); + runTaskWithInput(gridJob, HadoopTaskType.REDUCE, 1, "word3", "7", "word4", "15"); + + assertEquals( + "word1\t5\n" + + "word2\t10\n", + readAndSortFile(PATH_OUTPUT + "/_temporary/0/task_00000000-0000-0000-0000-000000000000_0000_r_000000/" + + getOutputFileNamePrefix() + "00000") + ); + + assertEquals( + "word3\t7\n" + + "word4\t15\n", + readAndSortFile(PATH_OUTPUT + "/_temporary/0/task_00000000-0000-0000-0000-000000000000_0000_r_000001/" + + getOutputFileNamePrefix() + "00001") + ); + } + + /** + * Tests combine task execution. + * + * @throws Exception If fails. + */ + public void testCombinerTask() throws Exception { + HadoopV2Job gridJob = getHadoopJob("/", "/"); + + HadoopTestTaskContext ctx = + runTaskWithInput(gridJob, HadoopTaskType.COMBINE, 0, "word1", "5", "word2", "10"); + + assertEquals("word1,5; word2,10", Joiner.on("; ").join(ctx.mockOutput())); + + ctx = runTaskWithInput(gridJob, HadoopTaskType.COMBINE, 1, "word3", "7", "word4", "15"); + + assertEquals("word3,7; word4,15", Joiner.on("; ").join(ctx.mockOutput())); + } + + /** + * Runs chain of map-combine task on file block. + * + * @param fileBlock block of input file to be processed. + * @param gridJob Hadoop job implementation. + * @return Context of combine task with mock output. + * @throws IgniteCheckedException If fails. + */ + private HadoopTestTaskContext runMapCombineTask(HadoopFileBlock fileBlock, HadoopV2Job gridJob) + throws IgniteCheckedException { + HadoopTaskInfo taskInfo = new HadoopTaskInfo(HadoopTaskType.MAP, gridJob.id(), 0, 0, fileBlock); + + HadoopTestTaskContext mapCtx = new HadoopTestTaskContext(taskInfo, gridJob); + + mapCtx.run(); + + //Prepare input for combine + taskInfo = new HadoopTaskInfo(HadoopTaskType.COMBINE, gridJob.id(), 0, 0, null); + + HadoopTestTaskContext combineCtx = new HadoopTestTaskContext(taskInfo, gridJob); + + combineCtx.makeTreeOfWritables(mapCtx.mockOutput()); + + combineCtx.run(); + + return combineCtx; + } + + /** + * Tests all job in complex. + * Runs 2 chains of map-combine tasks and sends result into one reduce task. + * + * @throws Exception If fails. + */ + @SuppressWarnings("ConstantConditions") + public void testAllTasks() throws Exception { + IgfsPath inDir = new IgfsPath(PATH_INPUT); + + igfs.mkdirs(inDir); + + IgfsPath inFile = new IgfsPath(inDir, HadoopWordCount2.class.getSimpleName() + "-input"); + + URI inFileUri = URI.create(igfsScheme() + inFile.toString()); + + generateTestFile(inFile.toString(), "red", 100, "blue", 200, "green", 150, "yellow", 70); + + //Split file into two blocks + long fileLen = igfs.info(inFile).length(); + + Long l = fileLen / 2; + + HadoopFileBlock fileBlock1 = new HadoopFileBlock(HOSTS, inFileUri, 0, l); + HadoopFileBlock fileBlock2 = new HadoopFileBlock(HOSTS, inFileUri, l, fileLen - l); + + HadoopV2Job gridJob = getHadoopJob(inFileUri.toString(), igfsScheme() + PATH_OUTPUT); + + HadoopTestTaskContext combine1Ctx = runMapCombineTask(fileBlock1, gridJob); + + HadoopTestTaskContext combine2Ctx = runMapCombineTask(fileBlock2, gridJob); + + //Prepare input for combine + HadoopTaskInfo taskInfo = new HadoopTaskInfo(HadoopTaskType.REDUCE, gridJob.id(), 0, 0, null); + + HadoopTestTaskContext reduceCtx = new HadoopTestTaskContext(taskInfo, gridJob); + + reduceCtx.makeTreeOfWritables(combine1Ctx.mockOutput()); + reduceCtx.makeTreeOfWritables(combine2Ctx.mockOutput()); + + reduceCtx.run(); + + reduceCtx.taskInfo(new HadoopTaskInfo(HadoopTaskType.COMMIT, gridJob.id(), 0, 0, null)); + + reduceCtx.run(); + + assertEquals( + "blue\t200\n" + + "green\t150\n" + + "red\t100\n" + + "yellow\t70\n", + readAndSortFile(PATH_OUTPUT + "/" + getOutputFileNamePrefix() + "00000") + ); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java new file mode 100644 index 0000000..b41a260 --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.hadoop.mapred.*; +import org.apache.ignite.internal.processors.hadoop.examples.*; +import org.apache.ignite.internal.processors.hadoop.v2.*; + +import java.io.*; +import java.util.*; + +import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*; + +/** + * Tests of Map, Combine and Reduce task executions via running of job of hadoop API v1. + */ +public class HadoopTasksV1Test extends HadoopTasksAllVersionsTest { + /** + * Creates WordCount hadoop job for API v1. + * + * @param inFile Input file name for the job. + * @param outFile Output file name for the job. + * @return Hadoop job. + * @throws IOException If fails. + */ + @Override public HadoopV2Job getHadoopJob(String inFile, String outFile) throws Exception { + JobConf jobConf = HadoopWordCount1.getJob(inFile, outFile); + + setupFileSystems(jobConf); + + HadoopDefaultJobInfo jobInfo = createJobInfo(jobConf); + + HadoopJobId jobId = new HadoopJobId(new UUID(0, 0), 0); + + return new HadoopV2Job(jobId, jobInfo, log); + } + + /** {@inheritDoc} */ + @Override public String getOutputFileNamePrefix() { + return "part-"; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java new file mode 100644 index 0000000..b677c63 --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.hadoop.conf.*; +import org.apache.hadoop.fs.*; +import org.apache.hadoop.io.*; +import org.apache.hadoop.mapreduce.*; +import org.apache.hadoop.mapreduce.lib.input.*; +import org.apache.hadoop.mapreduce.lib.output.*; +import org.apache.ignite.internal.processors.hadoop.examples.*; +import org.apache.ignite.internal.processors.hadoop.v2.*; + +import java.util.*; + +import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*; + +/** + * Tests of Map, Combine and Reduce task executions via running of job of hadoop API v2. + */ +public class HadoopTasksV2Test extends HadoopTasksAllVersionsTest { + /** + * Creates WordCount hadoop job for API v2. + * + * @param inFile Input file name for the job. + * @param outFile Output file name for the job. + * @return Hadoop job. + * @throws Exception if fails. + */ + @Override public HadoopV2Job getHadoopJob(String inFile, String outFile) throws Exception { + Job job = Job.getInstance(); + + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(IntWritable.class); + + HadoopWordCount2.setTasksClasses(job, true, true, true); + + Configuration conf = job.getConfiguration(); + + setupFileSystems(conf); + + FileInputFormat.setInputPaths(job, new Path(inFile)); + FileOutputFormat.setOutputPath(job, new Path(outFile)); + + job.setJarByClass(HadoopWordCount2.class); + + Job hadoopJob = HadoopWordCount2.getJob(inFile, outFile); + + HadoopDefaultJobInfo jobInfo = createJobInfo(hadoopJob.getConfiguration()); + + HadoopJobId jobId = new HadoopJobId(new UUID(0, 0), 0); + + return new HadoopV2Job(jobId, jobInfo, log); + } + + /** {@inheritDoc} */ + @Override public String getOutputFileNamePrefix() { + return "part-r-"; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTestRoundRobinMrPlanner.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTestRoundRobinMrPlanner.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTestRoundRobinMrPlanner.java new file mode 100644 index 0000000..a56c7c7 --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTestRoundRobinMrPlanner.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.internal.processors.hadoop.planner.*; +import org.jetbrains.annotations.*; + +import java.util.*; + +/** + * Round-robin mr planner. + */ +public class HadoopTestRoundRobinMrPlanner implements HadoopMapReducePlanner { + /** {@inheritDoc} */ + @Override public HadoopMapReducePlan preparePlan(HadoopJob job, Collection<ClusterNode> top, + @Nullable HadoopMapReducePlan oldPlan) throws IgniteCheckedException { + if (top.isEmpty()) + throw new IllegalArgumentException("Topology is empty"); + + // Has at least one element. + Iterator<ClusterNode> it = top.iterator(); + + Map<UUID, Collection<HadoopInputSplit>> mappers = new HashMap<>(); + + for (HadoopInputSplit block : job.input()) { + ClusterNode node = it.next(); + + Collection<HadoopInputSplit> nodeBlocks = mappers.get(node.id()); + + if (nodeBlocks == null) { + nodeBlocks = new ArrayList<>(); + + mappers.put(node.id(), nodeBlocks); + } + + nodeBlocks.add(block); + + if (!it.hasNext()) + it = top.iterator(); + } + + int[] rdc = new int[job.info().reducers()]; + + for (int i = 0; i < rdc.length; i++) + rdc[i] = i; + + return new HadoopDefaultMapReducePlan(mappers, Collections.singletonMap(it.next().id(), rdc)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTestTaskContext.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTestTaskContext.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTestTaskContext.java new file mode 100644 index 0000000..e444270 --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTestTaskContext.java @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.hadoop.io.*; +import org.apache.hadoop.mapred.*; +import org.apache.ignite.*; +import org.apache.ignite.internal.processors.hadoop.v2.*; + +import java.io.*; +import java.util.*; + +/** + * Context for test purpose. + */ +class HadoopTestTaskContext extends HadoopV2TaskContext { + /** + * Simple key-vale pair. + * @param <K> Key class. + * @param <V> Value class. + */ + public static class Pair<K,V> { + /** Key */ + private K key; + + /** Value */ + private V val; + + /** + * @param key key. + * @param val value. + */ + Pair(K key, V val) { + this.key = key; + this.val = val; + } + + /** + * Getter of key. + * @return key. + */ + K key() { + return key; + } + + /** + * Getter of value. + * @return value. + */ + V value() { + return val; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return key + "," + val; + } + } + + /** Mock output container- result data of task execution if it is not overridden. */ + private List<Pair<String, Integer>> mockOutput = new ArrayList<>(); + + /** Mock input container- input data if it is not overridden. */ + private Map<Object,List> mockInput = new TreeMap<>(); + + /** Context output implementation to write data into mockOutput. */ + private HadoopTaskOutput output = new HadoopTaskOutput() { + /** {@inheritDoc} */ + @Override public void write(Object key, Object val) { + //Check of casting and extract/copy values + String strKey = new String(((Text)key).getBytes()); + int intVal = ((IntWritable)val).get(); + + mockOutput().add(new Pair<>(strKey, intVal)); + } + + /** {@inheritDoc} */ + @Override public void close() { + throw new UnsupportedOperationException(); + } + }; + + /** Context input implementation to read data from mockInput. */ + private HadoopTaskInput input = new HadoopTaskInput() { + /** Iterator of keys and associated lists of values. */ + Iterator<Map.Entry<Object, List>> iter; + + /** Current key and associated value list. */ + Map.Entry<Object, List> currEntry; + + /** {@inheritDoc} */ + @Override public boolean next() { + if (iter == null) + iter = mockInput().entrySet().iterator(); + + if (iter.hasNext()) + currEntry = iter.next(); + else + currEntry = null; + + return currEntry != null; + } + + /** {@inheritDoc} */ + @Override public Object key() { + return currEntry.getKey(); + } + + /** {@inheritDoc} */ + @Override public Iterator<?> values() { + return currEntry.getValue().iterator() ; + } + + /** {@inheritDoc} */ + @Override public void close() { + throw new UnsupportedOperationException(); + } + }; + + /** + * Getter of mock output container - result of task if it is not overridden. + * + * @return mock output. + */ + public List<Pair<String, Integer>> mockOutput() { + return mockOutput; + } + + /** + * Getter of mock input container- input data if it is not overridden. + * + * @return mock output. + */ + public Map<Object, List> mockInput() { + return mockInput; + } + + /** + * Generate one-key-multiple-values tree from array of key-value pairs, and wrap its into Writable objects. + * The result is placed into mock input. + * + * @param flatData list of key-value pair. + */ + public void makeTreeOfWritables(Iterable<Pair<String, Integer>> flatData) { + Text key = new Text(); + + for (HadoopTestTaskContext.Pair<String, Integer> pair : flatData) { + key.set(pair.key); + ArrayList<IntWritable> valList; + + if (!mockInput.containsKey(key)) { + valList = new ArrayList<>(); + mockInput.put(key, valList); + key = new Text(); + } + else + valList = (ArrayList<IntWritable>) mockInput.get(key); + valList.add(new IntWritable(pair.value())); + } + } + + /** + * @param taskInfo Task info. + * @param gridJob Grid Hadoop job. + */ + public HadoopTestTaskContext(HadoopTaskInfo taskInfo, HadoopJob gridJob) throws IgniteCheckedException { + super(taskInfo, gridJob, gridJob.id(), null, jobConfDataInput(gridJob)); + } + + /** + * Creates DataInput to read JobConf. + * + * @param job Job. + * @return DataInput with JobConf. + * @throws IgniteCheckedException If failed. + */ + private static DataInput jobConfDataInput(HadoopJob job) throws IgniteCheckedException { + JobConf jobConf = new JobConf(); + + for (Map.Entry<String, String> e : ((HadoopDefaultJobInfo)job.info()).properties().entrySet()) + jobConf.set(e.getKey(), e.getValue()); + + ByteArrayOutputStream buf = new ByteArrayOutputStream(); + + try { + jobConf.write(new DataOutputStream(buf)); + } + catch (IOException e) { + throw new IgniteCheckedException(e); + } + + return new DataInputStream(new ByteArrayInputStream(buf.toByteArray())); + } + + /** {@inheritDoc} */ + @Override public HadoopTaskOutput output() { + return output; + } + + /** {@inheritDoc} */ + @Override public HadoopTaskInput input() { + return input; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTestUtils.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTestUtils.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTestUtils.java new file mode 100644 index 0000000..ef60762 --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTestUtils.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.ignite.internal.util.typedef.*; + +import java.io.*; +import java.util.*; + +import static org.junit.Assert.*; + +/** + * Utility class for tests. + */ +public class HadoopTestUtils { + /** + * Checks that job statistics file contains valid strings only. + * + * @param reader Buffered reader to get lines of job statistics. + * @return Amount of events. + * @throws IOException If failed. + */ + public static long simpleCheckJobStatFile(BufferedReader reader) throws IOException { + Collection<String> phases = new HashSet<>(); + + phases.add("submit"); + phases.add("prepare"); + phases.add("start"); + phases.add("finish"); + phases.add("requestId"); + phases.add("responseId"); + + Collection<String> evtTypes = new HashSet<>(); + + evtTypes.add("JOB"); + evtTypes.add("SETUP"); + evtTypes.add("MAP"); + evtTypes.add("SHUFFLE"); + evtTypes.add("REDUCE"); + evtTypes.add("COMBINE"); + evtTypes.add("COMMIT"); + + long evtCnt = 0; + String line; + + Map<Long, String> reduceNodes = new HashMap<>(); + + while((line = reader.readLine()) != null) { + String[] splitLine = line.split(":"); + + //Try parse timestamp + Long.parseLong(splitLine[1]); + + String[] evt = splitLine[0].split(" "); + + assertTrue("Unknown event '" + evt[0] + "'", evtTypes.contains(evt[0])); + + String phase; + + if ("JOB".equals(evt[0])) + phase = evt[1]; + else { + assertEquals(4, evt.length); + assertTrue("The node id is not defined", !F.isEmpty(evt[3])); + + long taskNum = Long.parseLong(evt[1]); + + if (("REDUCE".equals(evt[0]) || "SHUFFLE".equals(evt[0]))) { + String nodeId = reduceNodes.get(taskNum); + + if (nodeId == null) + reduceNodes.put(taskNum, evt[3]); + else + assertEquals("Different nodes for SHUFFLE and REDUCE tasks", nodeId, evt[3]); + } + + phase = evt[2]; + } + + assertTrue("Unknown phase '" + phase + "' in " + Arrays.toString(evt), phases.contains(phase)); + + evtCnt++; + } + + return evtCnt; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java new file mode 100644 index 0000000..ebc89f4 --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.hadoop.fs.*; +import org.apache.hadoop.io.*; +import org.apache.hadoop.io.serializer.*; +import org.apache.hadoop.mapred.*; +import org.apache.ignite.*; +import org.apache.ignite.internal.processors.hadoop.v2.*; + +import java.io.*; +import java.util.*; + +import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*; + +/** + * Self test of {@link org.apache.ignite.internal.processors.hadoop.v2.HadoopV2Job}. + */ +public class HadoopV2JobSelfTest extends HadoopAbstractSelfTest { + /** */ + private static final String TEST_SERIALIZED_VALUE = "Test serialized value"; + + /** + * Custom serialization class that accepts {@link Writable}. + */ + private static class CustomSerialization extends WritableSerialization { + /** {@inheritDoc} */ + @Override public Deserializer<Writable> getDeserializer(Class<Writable> c) { + return new Deserializer<Writable>() { + @Override public void open(InputStream in) { } + + @Override public Writable deserialize(Writable writable) { + return new Text(TEST_SERIALIZED_VALUE); + } + + @Override public void close() { } + }; + } + } + + /** + * Tests that {@link HadoopJob} provides wrapped serializer if it's set in configuration. + * + * @throws IgniteCheckedException If fails. + */ + public void testCustomSerializationApplying() throws IgniteCheckedException { + JobConf cfg = new JobConf(); + + cfg.setMapOutputKeyClass(IntWritable.class); + cfg.setMapOutputValueClass(Text.class); + cfg.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, CustomSerialization.class.getName()); + + HadoopJob job = new HadoopV2Job(new HadoopJobId(UUID.randomUUID(), 1), createJobInfo(cfg), log); + + HadoopTaskContext taskCtx = job.getTaskContext(new HadoopTaskInfo(HadoopTaskType.MAP, null, 0, 0, + null)); + + HadoopSerialization ser = taskCtx.keySerialization(); + + assertEquals(HadoopSerializationWrapper.class.getName(), ser.getClass().getName()); + + DataInput in = new DataInputStream(new ByteArrayInputStream(new byte[0])); + + assertEquals(TEST_SERIALIZED_VALUE, ser.read(in, null).toString()); + + ser = taskCtx.valueSerialization(); + + assertEquals(HadoopSerializationWrapper.class.getName(), ser.getClass().getName()); + + assertEquals(TEST_SERIALIZED_VALUE, ser.read(in, null).toString()); + } +}