http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTasksAllVersionsTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTasksAllVersionsTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTasksAllVersionsTest.java deleted file mode 100644 index af3f872..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTasksAllVersionsTest.java +++ /dev/null @@ -1,259 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import com.google.common.base.*; -import org.apache.hadoop.io.*; -import org.apache.ignite.*; -import org.apache.ignite.igfs.*; -import org.apache.ignite.internal.processors.hadoop.examples.*; -import org.apache.ignite.internal.processors.hadoop.v2.*; - -import java.io.*; -import java.net.*; -import java.util.*; - -/** - * Tests of Map, Combine and Reduce task executions of any version of hadoop API. - */ -abstract class GridHadoopTasksAllVersionsTest extends GridHadoopAbstractWordCountTest { - /** Empty hosts array. */ - private static final String[] HOSTS = new String[0]; - - /** - * Creates some grid hadoop job. Override this method to create tests for any job implementation. - * - * @param inFile Input file name for the job. - * @param outFile Output file name for the job. - * @return Hadoop job. - * @throws IOException If fails. - */ - public abstract GridHadoopV2Job getHadoopJob(String inFile, String outFile) throws Exception; - - /** - * @return prefix of reducer output file name. It's "part-" for v1 and "part-r-" for v2 API - */ - public abstract String getOutputFileNamePrefix(); - - /** - * Tests map task execution. - * - * @throws Exception If fails. - */ - @SuppressWarnings("ConstantConditions") - public void testMapTask() throws Exception { - IgfsPath inDir = new IgfsPath(PATH_INPUT); - - igfs.mkdirs(inDir); - - IgfsPath inFile = new IgfsPath(inDir, GridHadoopWordCount2.class.getSimpleName() + "-input"); - - URI inFileUri = URI.create(igfsScheme() + inFile.toString()); - - try (PrintWriter pw = new PrintWriter(igfs.create(inFile, true))) { - pw.println("hello0 world0"); - pw.println("world1 hello1"); - } - - GridHadoopFileBlock fileBlock1 = new GridHadoopFileBlock(HOSTS, inFileUri, 0, igfs.info(inFile).length() - 1); - - try (PrintWriter pw = new PrintWriter(igfs.append(inFile, false))) { - pw.println("hello2 world2"); - pw.println("world3 hello3"); - } - GridHadoopFileBlock fileBlock2 = new GridHadoopFileBlock(HOSTS, inFileUri, fileBlock1.length(), - igfs.info(inFile).length() - fileBlock1.length()); - - GridHadoopV2Job gridJob = getHadoopJob(igfsScheme() + inFile.toString(), igfsScheme() + PATH_OUTPUT); - - GridHadoopTaskInfo taskInfo = new GridHadoopTaskInfo(GridHadoopTaskType.MAP, gridJob.id(), 0, 0, fileBlock1); - - GridHadoopTestTaskContext ctx = new GridHadoopTestTaskContext(taskInfo, gridJob); - - ctx.mockOutput().clear(); - - ctx.run(); - - assertEquals("hello0,1; world0,1; world1,1; hello1,1", Joiner.on("; ").join(ctx.mockOutput())); - - ctx.mockOutput().clear(); - - ctx.taskInfo(new GridHadoopTaskInfo(GridHadoopTaskType.MAP, gridJob.id(), 0, 0, fileBlock2)); - - ctx.run(); - - assertEquals("hello2,1; world2,1; world3,1; hello3,1", Joiner.on("; ").join(ctx.mockOutput())); - } - - /** - * Generates input data for reduce-like operation into mock context input and runs the operation. - * - * @param gridJob Job is to create reduce task from. - * @param taskType Type of task - combine or reduce. - * @param taskNum Number of task in job. - * @param words Pairs of words and its counts. - * @return Context with mock output. - * @throws IgniteCheckedException If fails. - */ - private GridHadoopTestTaskContext runTaskWithInput(GridHadoopV2Job gridJob, GridHadoopTaskType taskType, - int taskNum, String... words) throws IgniteCheckedException { - GridHadoopTaskInfo taskInfo = new GridHadoopTaskInfo(taskType, gridJob.id(), taskNum, 0, null); - - GridHadoopTestTaskContext ctx = new GridHadoopTestTaskContext(taskInfo, gridJob); - - for (int i = 0; i < words.length; i+=2) { - List<IntWritable> valList = new ArrayList<>(); - - for (int j = 0; j < Integer.parseInt(words[i + 1]); j++) - valList.add(new IntWritable(1)); - - ctx.mockInput().put(new Text(words[i]), valList); - } - - ctx.run(); - - return ctx; - } - - /** - * Tests reduce task execution. - * - * @throws Exception If fails. - */ - public void testReduceTask() throws Exception { - GridHadoopV2Job gridJob = getHadoopJob(igfsScheme() + PATH_INPUT, igfsScheme() + PATH_OUTPUT); - - runTaskWithInput(gridJob, GridHadoopTaskType.REDUCE, 0, "word1", "5", "word2", "10"); - runTaskWithInput(gridJob, GridHadoopTaskType.REDUCE, 1, "word3", "7", "word4", "15"); - - assertEquals( - "word1\t5\n" + - "word2\t10\n", - readAndSortFile(PATH_OUTPUT + "/_temporary/0/task_00000000-0000-0000-0000-000000000000_0000_r_000000/" + - getOutputFileNamePrefix() + "00000") - ); - - assertEquals( - "word3\t7\n" + - "word4\t15\n", - readAndSortFile(PATH_OUTPUT + "/_temporary/0/task_00000000-0000-0000-0000-000000000000_0000_r_000001/" + - getOutputFileNamePrefix() + "00001") - ); - } - - /** - * Tests combine task execution. - * - * @throws Exception If fails. - */ - public void testCombinerTask() throws Exception { - GridHadoopV2Job gridJob = getHadoopJob("/", "/"); - - GridHadoopTestTaskContext ctx = - runTaskWithInput(gridJob, GridHadoopTaskType.COMBINE, 0, "word1", "5", "word2", "10"); - - assertEquals("word1,5; word2,10", Joiner.on("; ").join(ctx.mockOutput())); - - ctx = runTaskWithInput(gridJob, GridHadoopTaskType.COMBINE, 1, "word3", "7", "word4", "15"); - - assertEquals("word3,7; word4,15", Joiner.on("; ").join(ctx.mockOutput())); - } - - /** - * Runs chain of map-combine task on file block. - * - * @param fileBlock block of input file to be processed. - * @param gridJob Hadoop job implementation. - * @return Context of combine task with mock output. - * @throws IgniteCheckedException If fails. - */ - private GridHadoopTestTaskContext runMapCombineTask(GridHadoopFileBlock fileBlock, GridHadoopV2Job gridJob) - throws IgniteCheckedException { - GridHadoopTaskInfo taskInfo = new GridHadoopTaskInfo(GridHadoopTaskType.MAP, gridJob.id(), 0, 0, fileBlock); - - GridHadoopTestTaskContext mapCtx = new GridHadoopTestTaskContext(taskInfo, gridJob); - - mapCtx.run(); - - //Prepare input for combine - taskInfo = new GridHadoopTaskInfo(GridHadoopTaskType.COMBINE, gridJob.id(), 0, 0, null); - - GridHadoopTestTaskContext combineCtx = new GridHadoopTestTaskContext(taskInfo, gridJob); - - combineCtx.makeTreeOfWritables(mapCtx.mockOutput()); - - combineCtx.run(); - - return combineCtx; - } - - /** - * Tests all job in complex. - * Runs 2 chains of map-combine tasks and sends result into one reduce task. - * - * @throws Exception If fails. - */ - @SuppressWarnings("ConstantConditions") - public void testAllTasks() throws Exception { - IgfsPath inDir = new IgfsPath(PATH_INPUT); - - igfs.mkdirs(inDir); - - IgfsPath inFile = new IgfsPath(inDir, GridHadoopWordCount2.class.getSimpleName() + "-input"); - - URI inFileUri = URI.create(igfsScheme() + inFile.toString()); - - generateTestFile(inFile.toString(), "red", 100, "blue", 200, "green", 150, "yellow", 70); - - //Split file into two blocks - long fileLen = igfs.info(inFile).length(); - - Long l = fileLen / 2; - - GridHadoopFileBlock fileBlock1 = new GridHadoopFileBlock(HOSTS, inFileUri, 0, l); - GridHadoopFileBlock fileBlock2 = new GridHadoopFileBlock(HOSTS, inFileUri, l, fileLen - l); - - GridHadoopV2Job gridJob = getHadoopJob(inFileUri.toString(), igfsScheme() + PATH_OUTPUT); - - GridHadoopTestTaskContext combine1Ctx = runMapCombineTask(fileBlock1, gridJob); - - GridHadoopTestTaskContext combine2Ctx = runMapCombineTask(fileBlock2, gridJob); - - //Prepare input for combine - GridHadoopTaskInfo taskInfo = new GridHadoopTaskInfo(GridHadoopTaskType.REDUCE, gridJob.id(), 0, 0, null); - - GridHadoopTestTaskContext reduceCtx = new GridHadoopTestTaskContext(taskInfo, gridJob); - - reduceCtx.makeTreeOfWritables(combine1Ctx.mockOutput()); - reduceCtx.makeTreeOfWritables(combine2Ctx.mockOutput()); - - reduceCtx.run(); - - reduceCtx.taskInfo(new GridHadoopTaskInfo(GridHadoopTaskType.COMMIT, gridJob.id(), 0, 0, null)); - - reduceCtx.run(); - - assertEquals( - "blue\t200\n" + - "green\t150\n" + - "red\t100\n" + - "yellow\t70\n", - readAndSortFile(PATH_OUTPUT + "/" + getOutputFileNamePrefix() + "00000") - ); - } -}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTasksV1Test.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTasksV1Test.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTasksV1Test.java deleted file mode 100644 index 15ac125..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTasksV1Test.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import org.apache.hadoop.mapred.*; -import org.apache.ignite.internal.processors.hadoop.examples.*; -import org.apache.ignite.internal.processors.hadoop.v2.*; - -import java.io.*; -import java.util.*; - -import static org.apache.ignite.internal.processors.hadoop.GridHadoopUtils.*; - -/** - * Tests of Map, Combine and Reduce task executions via running of job of hadoop API v1. - */ -public class GridHadoopTasksV1Test extends GridHadoopTasksAllVersionsTest { - /** - * Creates WordCount hadoop job for API v1. - * - * @param inFile Input file name for the job. - * @param outFile Output file name for the job. - * @return Hadoop job. - * @throws IOException If fails. - */ - @Override public GridHadoopV2Job getHadoopJob(String inFile, String outFile) throws Exception { - JobConf jobConf = GridHadoopWordCount1.getJob(inFile, outFile); - - setupFileSystems(jobConf); - - GridHadoopDefaultJobInfo jobInfo = createJobInfo(jobConf); - - GridHadoopJobId jobId = new GridHadoopJobId(new UUID(0, 0), 0); - - return new GridHadoopV2Job(jobId, jobInfo, log); - } - - /** {@inheritDoc} */ - @Override public String getOutputFileNamePrefix() { - return "part-"; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTasksV2Test.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTasksV2Test.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTasksV2Test.java deleted file mode 100644 index e48eb01..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTasksV2Test.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import org.apache.hadoop.conf.*; -import org.apache.hadoop.fs.*; -import org.apache.hadoop.io.*; -import org.apache.hadoop.mapreduce.*; -import org.apache.hadoop.mapreduce.lib.input.*; -import org.apache.hadoop.mapreduce.lib.output.*; -import org.apache.ignite.internal.processors.hadoop.examples.*; -import org.apache.ignite.internal.processors.hadoop.v2.*; - -import java.util.*; - -import static org.apache.ignite.internal.processors.hadoop.GridHadoopUtils.*; - -/** - * Tests of Map, Combine and Reduce task executions via running of job of hadoop API v2. - */ -public class GridHadoopTasksV2Test extends GridHadoopTasksAllVersionsTest { - /** - * Creates WordCount hadoop job for API v2. - * - * @param inFile Input file name for the job. - * @param outFile Output file name for the job. - * @return Hadoop job. - * @throws Exception if fails. - */ - @Override public GridHadoopV2Job getHadoopJob(String inFile, String outFile) throws Exception { - Job job = Job.getInstance(); - - job.setOutputKeyClass(Text.class); - job.setOutputValueClass(IntWritable.class); - - GridHadoopWordCount2.setTasksClasses(job, true, true, true); - - Configuration conf = job.getConfiguration(); - - setupFileSystems(conf); - - FileInputFormat.setInputPaths(job, new Path(inFile)); - FileOutputFormat.setOutputPath(job, new Path(outFile)); - - job.setJarByClass(GridHadoopWordCount2.class); - - Job hadoopJob = GridHadoopWordCount2.getJob(inFile, outFile); - - GridHadoopDefaultJobInfo jobInfo = createJobInfo(hadoopJob.getConfiguration()); - - GridHadoopJobId jobId = new GridHadoopJobId(new UUID(0, 0), 0); - - return new GridHadoopV2Job(jobId, jobInfo, log); - } - - /** {@inheritDoc} */ - @Override public String getOutputFileNamePrefix() { - return "part-r-"; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTestRoundRobinMrPlanner.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTestRoundRobinMrPlanner.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTestRoundRobinMrPlanner.java deleted file mode 100644 index 5baa8cd..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTestRoundRobinMrPlanner.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import org.apache.ignite.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.internal.processors.hadoop.planner.*; -import org.jetbrains.annotations.*; - -import java.util.*; - -/** - * Round-robin mr planner. - */ -public class GridHadoopTestRoundRobinMrPlanner implements GridHadoopMapReducePlanner { - /** {@inheritDoc} */ - @Override public GridHadoopMapReducePlan preparePlan(GridHadoopJob job, Collection<ClusterNode> top, - @Nullable GridHadoopMapReducePlan oldPlan) throws IgniteCheckedException { - if (top.isEmpty()) - throw new IllegalArgumentException("Topology is empty"); - - // Has at least one element. - Iterator<ClusterNode> it = top.iterator(); - - Map<UUID, Collection<GridHadoopInputSplit>> mappers = new HashMap<>(); - - for (GridHadoopInputSplit block : job.input()) { - ClusterNode node = it.next(); - - Collection<GridHadoopInputSplit> nodeBlocks = mappers.get(node.id()); - - if (nodeBlocks == null) { - nodeBlocks = new ArrayList<>(); - - mappers.put(node.id(), nodeBlocks); - } - - nodeBlocks.add(block); - - if (!it.hasNext()) - it = top.iterator(); - } - - int[] rdc = new int[job.info().reducers()]; - - for (int i = 0; i < rdc.length; i++) - rdc[i] = i; - - return new GridHadoopDefaultMapReducePlan(mappers, Collections.singletonMap(it.next().id(), rdc)); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTestTaskContext.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTestTaskContext.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTestTaskContext.java deleted file mode 100644 index 4e0aa9b..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTestTaskContext.java +++ /dev/null @@ -1,219 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import org.apache.hadoop.io.*; -import org.apache.hadoop.mapred.*; -import org.apache.ignite.*; -import org.apache.ignite.internal.processors.hadoop.v2.*; - -import java.io.*; -import java.util.*; - -/** - * Context for test purpose. - */ -class GridHadoopTestTaskContext extends GridHadoopV2TaskContext { - /** - * Simple key-vale pair. - * @param <K> Key class. - * @param <V> Value class. - */ - public static class Pair<K,V> { - /** Key */ - private K key; - - /** Value */ - private V val; - - /** - * @param key key. - * @param val value. - */ - Pair(K key, V val) { - this.key = key; - this.val = val; - } - - /** - * Getter of key. - * @return key. - */ - K key() { - return key; - } - - /** - * Getter of value. - * @return value. - */ - V value() { - return val; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return key + "," + val; - } - } - - /** Mock output container- result data of task execution if it is not overridden. */ - private List<Pair<String, Integer>> mockOutput = new ArrayList<>(); - - /** Mock input container- input data if it is not overridden. */ - private Map<Object,List> mockInput = new TreeMap<>(); - - /** Context output implementation to write data into mockOutput. */ - private GridHadoopTaskOutput output = new GridHadoopTaskOutput() { - /** {@inheritDoc} */ - @Override public void write(Object key, Object val) { - //Check of casting and extract/copy values - String strKey = new String(((Text)key).getBytes()); - int intVal = ((IntWritable)val).get(); - - mockOutput().add(new Pair<>(strKey, intVal)); - } - - /** {@inheritDoc} */ - @Override public void close() { - throw new UnsupportedOperationException(); - } - }; - - /** Context input implementation to read data from mockInput. */ - private GridHadoopTaskInput input = new GridHadoopTaskInput() { - /** Iterator of keys and associated lists of values. */ - Iterator<Map.Entry<Object, List>> iter; - - /** Current key and associated value list. */ - Map.Entry<Object, List> currEntry; - - /** {@inheritDoc} */ - @Override public boolean next() { - if (iter == null) - iter = mockInput().entrySet().iterator(); - - if (iter.hasNext()) - currEntry = iter.next(); - else - currEntry = null; - - return currEntry != null; - } - - /** {@inheritDoc} */ - @Override public Object key() { - return currEntry.getKey(); - } - - /** {@inheritDoc} */ - @Override public Iterator<?> values() { - return currEntry.getValue().iterator() ; - } - - /** {@inheritDoc} */ - @Override public void close() { - throw new UnsupportedOperationException(); - } - }; - - /** - * Getter of mock output container - result of task if it is not overridden. - * - * @return mock output. - */ - public List<Pair<String, Integer>> mockOutput() { - return mockOutput; - } - - /** - * Getter of mock input container- input data if it is not overridden. - * - * @return mock output. - */ - public Map<Object, List> mockInput() { - return mockInput; - } - - /** - * Generate one-key-multiple-values tree from array of key-value pairs, and wrap its into Writable objects. - * The result is placed into mock input. - * - * @param flatData list of key-value pair. - */ - public void makeTreeOfWritables(Iterable<Pair<String, Integer>> flatData) { - Text key = new Text(); - - for (GridHadoopTestTaskContext.Pair<String, Integer> pair : flatData) { - key.set(pair.key); - ArrayList<IntWritable> valList; - - if (!mockInput.containsKey(key)) { - valList = new ArrayList<>(); - mockInput.put(key, valList); - key = new Text(); - } - else - valList = (ArrayList<IntWritable>) mockInput.get(key); - valList.add(new IntWritable(pair.value())); - } - } - - /** - * @param taskInfo Task info. - * @param gridJob Grid Hadoop job. - */ - public GridHadoopTestTaskContext(GridHadoopTaskInfo taskInfo, GridHadoopJob gridJob) throws IgniteCheckedException { - super(taskInfo, gridJob, gridJob.id(), null, jobConfDataInput(gridJob)); - } - - /** - * Creates DataInput to read JobConf. - * - * @param job Job. - * @return DataInput with JobConf. - * @throws IgniteCheckedException If failed. - */ - private static DataInput jobConfDataInput(GridHadoopJob job) throws IgniteCheckedException { - JobConf jobConf = new JobConf(); - - for (Map.Entry<String, String> e : ((GridHadoopDefaultJobInfo)job.info()).properties().entrySet()) - jobConf.set(e.getKey(), e.getValue()); - - ByteArrayOutputStream buf = new ByteArrayOutputStream(); - - try { - jobConf.write(new DataOutputStream(buf)); - } - catch (IOException e) { - throw new IgniteCheckedException(e); - } - - return new DataInputStream(new ByteArrayInputStream(buf.toByteArray())); - } - - /** {@inheritDoc} */ - @Override public GridHadoopTaskOutput output() { - return output; - } - - /** {@inheritDoc} */ - @Override public GridHadoopTaskInput input() { - return input; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTestUtils.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTestUtils.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTestUtils.java deleted file mode 100644 index cdbb809..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTestUtils.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import org.apache.ignite.internal.util.typedef.*; - -import java.io.*; -import java.util.*; - -import static org.junit.Assert.*; - -/** - * Utility class for tests. - */ -public class GridHadoopTestUtils { - /** - * Checks that job statistics file contains valid strings only. - * - * @param reader Buffered reader to get lines of job statistics. - * @return Amount of events. - * @throws IOException If failed. - */ - public static long simpleCheckJobStatFile(BufferedReader reader) throws IOException { - Collection<String> phases = new HashSet<>(); - - phases.add("submit"); - phases.add("prepare"); - phases.add("start"); - phases.add("finish"); - phases.add("requestId"); - phases.add("responseId"); - - Collection<String> evtTypes = new HashSet<>(); - - evtTypes.add("JOB"); - evtTypes.add("SETUP"); - evtTypes.add("MAP"); - evtTypes.add("SHUFFLE"); - evtTypes.add("REDUCE"); - evtTypes.add("COMBINE"); - evtTypes.add("COMMIT"); - - long evtCnt = 0; - String line; - - Map<Long, String> reduceNodes = new HashMap<>(); - - while((line = reader.readLine()) != null) { - String[] splitLine = line.split(":"); - - //Try parse timestamp - Long.parseLong(splitLine[1]); - - String[] evt = splitLine[0].split(" "); - - assertTrue("Unknown event '" + evt[0] + "'", evtTypes.contains(evt[0])); - - String phase; - - if ("JOB".equals(evt[0])) - phase = evt[1]; - else { - assertEquals(4, evt.length); - assertTrue("The node id is not defined", !F.isEmpty(evt[3])); - - long taskNum = Long.parseLong(evt[1]); - - if (("REDUCE".equals(evt[0]) || "SHUFFLE".equals(evt[0]))) { - String nodeId = reduceNodes.get(taskNum); - - if (nodeId == null) - reduceNodes.put(taskNum, evt[3]); - else - assertEquals("Different nodes for SHUFFLE and REDUCE tasks", nodeId, evt[3]); - } - - phase = evt[2]; - } - - assertTrue("Unknown phase '" + phase + "' in " + Arrays.toString(evt), phases.contains(phase)); - - evtCnt++; - } - - return evtCnt; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopV2JobSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopV2JobSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopV2JobSelfTest.java deleted file mode 100644 index b201614..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopV2JobSelfTest.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import org.apache.hadoop.fs.*; -import org.apache.hadoop.io.*; -import org.apache.hadoop.io.serializer.*; -import org.apache.hadoop.mapred.*; -import org.apache.ignite.*; -import org.apache.ignite.internal.processors.hadoop.v2.*; - -import java.io.*; -import java.util.*; - -import static org.apache.ignite.internal.processors.hadoop.GridHadoopUtils.*; - -/** - * Self test of {@link GridHadoopV2Job}. - */ -public class GridHadoopV2JobSelfTest extends GridHadoopAbstractSelfTest { - /** */ - private static final String TEST_SERIALIZED_VALUE = "Test serialized value"; - - /** - * Custom serialization class that accepts {@link Writable}. - */ - private static class CustomSerialization extends WritableSerialization { - /** {@inheritDoc} */ - @Override public Deserializer<Writable> getDeserializer(Class<Writable> c) { - return new Deserializer<Writable>() { - @Override public void open(InputStream in) { } - - @Override public Writable deserialize(Writable writable) { - return new Text(TEST_SERIALIZED_VALUE); - } - - @Override public void close() { } - }; - } - } - - /** - * Tests that {@link GridHadoopJob} provides wrapped serializer if it's set in configuration. - * - * @throws IgniteCheckedException If fails. - */ - public void testCustomSerializationApplying() throws IgniteCheckedException { - JobConf cfg = new JobConf(); - - cfg.setMapOutputKeyClass(IntWritable.class); - cfg.setMapOutputValueClass(Text.class); - cfg.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, CustomSerialization.class.getName()); - - GridHadoopJob job = new GridHadoopV2Job(new GridHadoopJobId(UUID.randomUUID(), 1), createJobInfo(cfg), log); - - GridHadoopTaskContext taskCtx = job.getTaskContext(new GridHadoopTaskInfo(GridHadoopTaskType.MAP, null, 0, 0, - null)); - - GridHadoopSerialization ser = taskCtx.keySerialization(); - - assertEquals(GridHadoopSerializationWrapper.class.getName(), ser.getClass().getName()); - - DataInput in = new DataInputStream(new ByteArrayInputStream(new byte[0])); - - assertEquals(TEST_SERIALIZED_VALUE, ser.read(in, null).toString()); - - ser = taskCtx.valueSerialization(); - - assertEquals(GridHadoopSerializationWrapper.class.getName(), ser.getClass().getName()); - - assertEquals(TEST_SERIALIZED_VALUE, ser.read(in, null).toString()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopValidationSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopValidationSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopValidationSelfTest.java deleted file mode 100644 index 051d073..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopValidationSelfTest.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import org.apache.ignite.configuration.*; - -/** - * Configuration validation tests. - */ -public class GridHadoopValidationSelfTest extends GridHadoopAbstractSelfTest { - /** Peer class loading enabled flag. */ - public boolean peerClassLoading; - - /** {@inheritDoc} */ - @Override protected void afterTest() throws Exception { - stopAllGrids(true); - - peerClassLoading = false; - } - - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); - - cfg.setPeerClassLoadingEnabled(peerClassLoading); - - return cfg; - } - - /** - * Ensure that Grid starts when all configuration parameters are valid. - * - * @throws Exception If failed. - */ - public void testValid() throws Exception { - startGrids(1); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java new file mode 100644 index 0000000..7fda532 --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.hadoop.conf.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.igfs.*; +import org.apache.ignite.hadoop.fs.v2.IgniteHadoopFileSystem; +import org.apache.ignite.internal.processors.hadoop.fs.*; +import org.apache.ignite.spi.communication.tcp.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.io.*; + +import static org.apache.ignite.cache.CacheAtomicityMode.*; +import static org.apache.ignite.cache.CacheMode.*; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; + +/** + * Abstract class for Hadoop tests. + */ +public abstract class HadoopAbstractSelfTest extends GridCommonAbstractTest { + /** */ + private static TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** REST port. */ + protected static final int REST_PORT = 11212; + + /** IGFS name. */ + protected static final String igfsName = null; + + /** IGFS name. */ + protected static final String igfsMetaCacheName = "meta"; + + /** IGFS name. */ + protected static final String igfsDataCacheName = "data"; + + /** IGFS block size. */ + protected static final int igfsBlockSize = 1024; + + /** IGFS block group size. */ + protected static final int igfsBlockGroupSize = 8; + + /** Initial REST port. */ + private int restPort = REST_PORT; + + /** Initial classpath. */ + private static String initCp; + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + // Add surefire classpath to regular classpath. + initCp = System.getProperty("java.class.path"); + + String surefireCp = System.getProperty("surefire.test.class.path"); + + if (surefireCp != null) + System.setProperty("java.class.path", initCp + File.pathSeparatorChar + surefireCp); + + super.beforeTestsStarted(); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + super.afterTestsStopped(); + + // Restore classpath. + System.setProperty("java.class.path", initCp); + + initCp = null; + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setHadoopConfiguration(hadoopConfiguration(gridName)); + + TcpCommunicationSpi commSpi = new TcpCommunicationSpi(); + + commSpi.setSharedMemoryPort(-1); + + cfg.setCommunicationSpi(commSpi); + + TcpDiscoverySpi discoSpi = (TcpDiscoverySpi)cfg.getDiscoverySpi(); + + discoSpi.setIpFinder(IP_FINDER); + + if (igfsEnabled()) { + cfg.setCacheConfiguration(metaCacheConfiguration(), dataCacheConfiguration()); + + cfg.setFileSystemConfiguration(igfsConfiguration()); + } + + if (restEnabled()) { + ConnectorConfiguration clnCfg = new ConnectorConfiguration(); + + clnCfg.setPort(restPort++); + + cfg.setConnectorConfiguration(clnCfg); + } + + cfg.setLocalHost("127.0.0.1"); + cfg.setPeerClassLoadingEnabled(false); + + return cfg; + } + + /** + * @param gridName Grid name. + * @return Hadoop configuration. + */ + public HadoopConfiguration hadoopConfiguration(String gridName) { + HadoopConfiguration cfg = new HadoopConfiguration(); + + cfg.setMaxParallelTasks(3); + + return cfg; + } + + /** + * @return IGFS configuration. + */ + public FileSystemConfiguration igfsConfiguration() { + FileSystemConfiguration cfg = new FileSystemConfiguration(); + + cfg.setName(igfsName); + cfg.setBlockSize(igfsBlockSize); + cfg.setDataCacheName(igfsDataCacheName); + cfg.setMetaCacheName(igfsMetaCacheName); + cfg.setFragmentizerEnabled(false); + + return cfg; + } + + /** + * @return IGFS meta cache configuration. + */ + public CacheConfiguration metaCacheConfiguration() { + CacheConfiguration cfg = new CacheConfiguration(); + + cfg.setName(igfsMetaCacheName); + cfg.setCacheMode(REPLICATED); + cfg.setAtomicityMode(TRANSACTIONAL); + cfg.setWriteSynchronizationMode(FULL_SYNC); + + return cfg; + } + + /** + * @return IGFS data cache configuration. + */ + private CacheConfiguration dataCacheConfiguration() { + CacheConfiguration cfg = new CacheConfiguration(); + + cfg.setName(igfsDataCacheName); + cfg.setCacheMode(PARTITIONED); + cfg.setAtomicityMode(TRANSACTIONAL); + cfg.setAffinityMapper(new IgfsGroupDataBlocksKeyMapper(igfsBlockGroupSize)); + cfg.setWriteSynchronizationMode(FULL_SYNC); + + return cfg; + } + + /** + * @return {@code True} if IGFS is enabled on Hadoop nodes. + */ + protected boolean igfsEnabled() { + return false; + } + + /** + * @return {@code True} if REST is enabled on Hadoop nodes. + */ + protected boolean restEnabled() { + return false; + } + + /** + * @return Number of nodes to start. + */ + protected int gridCount() { + return 3; + } + + /** + * @param cfg Config. + */ + protected void setupFileSystems(Configuration cfg) { + cfg.set("fs.defaultFS", igfsScheme()); + cfg.set("fs.igfs.impl", org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem.class.getName()); + cfg.set("fs.AbstractFileSystem.igfs.impl", IgniteHadoopFileSystem. + class.getName()); + + HadoopFileSystemsUtils.setupFileSystems(cfg); + } + + /** + * @return IGFS scheme for test. + */ + protected String igfsScheme() { + return "igfs://:" + getTestGridName(0) + "@/"; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractWordCountTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractWordCountTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractWordCountTest.java new file mode 100644 index 0000000..1390982 --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractWordCountTest.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import com.google.common.base.*; +import org.apache.hadoop.conf.*; +import org.apache.hadoop.fs.FileSystem; +import org.apache.ignite.igfs.*; +import org.apache.ignite.internal.processors.igfs.*; + +import java.io.*; +import java.util.*; + +/** + * Abstract class for tests based on WordCount test job. + */ +public abstract class HadoopAbstractWordCountTest extends HadoopAbstractSelfTest { + /** Input path. */ + protected static final String PATH_INPUT = "/input"; + + /** Output path. */ + protected static final String PATH_OUTPUT = "/output"; + + /** IGFS instance. */ + protected IgfsEx igfs; + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + Configuration cfg = new Configuration(); + + setupFileSystems(cfg); + + // Init cache by correct LocalFileSystem implementation + FileSystem.getLocal(cfg); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + igfs = (IgfsEx)startGrids(gridCount()).fileSystem(igfsName); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(true); + } + + /** {@inheritDoc} */ + @Override protected boolean igfsEnabled() { + return true; + } + + /** {@inheritDoc} */ + @Override protected int gridCount() { + return 1; + } + + /** + * Generates test file. + * + * @param path File name. + * @param wordCounts Words and counts. + * @throws Exception If failed. + */ + protected void generateTestFile(String path, Object... wordCounts) throws Exception { + List<String> wordsArr = new ArrayList<>(); + + //Generating + for (int i = 0; i < wordCounts.length; i += 2) { + String word = (String) wordCounts[i]; + int cnt = (Integer) wordCounts[i + 1]; + + while (cnt-- > 0) + wordsArr.add(word); + } + + //Shuffling + for (int i = 0; i < wordsArr.size(); i++) { + int j = (int)(Math.random() * wordsArr.size()); + + Collections.swap(wordsArr, i, j); + } + + //Input file preparing + PrintWriter testInputFileWriter = new PrintWriter(igfs.create(new IgfsPath(path), true)); + + int j = 0; + + while (j < wordsArr.size()) { + int i = 5 + (int)(Math.random() * 5); + + List<String> subList = wordsArr.subList(j, Math.min(j + i, wordsArr.size())); + j += i; + + testInputFileWriter.println(Joiner.on(' ').join(subList)); + } + + testInputFileWriter.close(); + } + + /** + * Reads whole text file into String. + * + * @param fileName Name of the file to read. + * @return Content of the file as String value. + * @throws Exception If could not read the file. + */ + protected String readAndSortFile(String fileName) throws Exception { + BufferedReader reader = new BufferedReader(new InputStreamReader(igfs.open(new IgfsPath(fileName)))); + + List<String> list = new ArrayList<>(); + + String line; + + while ((line = reader.readLine()) != null) + list.add(line); + + Collections.sort(list); + + return Joiner.on('\n').join(list) + "\n"; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoaderTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoaderTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoaderTest.java new file mode 100644 index 0000000..a3289cb --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoaderTest.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import junit.framework.*; +import org.apache.hadoop.mapreduce.*; + +/** + * + */ +public class HadoopClassLoaderTest extends TestCase { + /** */ + HadoopClassLoader ldr = new HadoopClassLoader(null); + + /** + * @throws Exception If failed. + */ + public void testClassLoading() throws Exception { + assertNotSame(Test1.class, ldr.loadClass(Test1.class.getName())); + assertNotSame(Test2.class, ldr.loadClass(Test2.class.getName())); + assertSame(Test3.class, ldr.loadClass(Test3.class.getName())); + } + +// public void testDependencySearch() { +// assertTrue(ldr.hasExternalDependencies(Test1.class.getName(), new HashSet<String>())); +// assertTrue(ldr.hasExternalDependencies(Test2.class.getName(), new HashSet<String>())); +// } + + /** + * + */ + private static class Test1 { + /** */ + Test2 t2; + + /** */ + Job[][] jobs = new Job[4][4]; + } + + /** + * + */ + private static abstract class Test2 { + /** */ + abstract Test1 t1(); + } + + /** + * + */ + private static class Test3 { + // No-op. + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopCommandLineTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopCommandLineTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopCommandLineTest.java new file mode 100644 index 0000000..33fa358 --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopCommandLineTest.java @@ -0,0 +1,440 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import com.google.common.base.*; +import org.apache.ignite.*; +import org.apache.ignite.hadoop.fs.*; +import org.apache.ignite.igfs.*; +import org.apache.ignite.internal.processors.igfs.*; +import org.apache.ignite.internal.processors.hadoop.jobtracker.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.testframework.junits.common.*; +import org.jdk8.backport.*; + +import java.io.*; +import java.nio.file.*; +import java.util.*; + +/** + * Test of integration with Hadoop client via command line interface. + */ +public class HadoopCommandLineTest extends GridCommonAbstractTest { + /** IGFS instance. */ + private IgfsEx igfs; + + /** */ + private static final String igfsName = "igfs"; + + /** */ + private static File testWorkDir; + + /** */ + private static String hadoopHome; + + /** */ + private static String hiveHome; + + /** */ + private static File examplesJar; + + /** + * + * @param path File name. + * @param wordCounts Words and counts. + * @throws Exception If failed. + */ + private void generateTestFile(File path, Object... wordCounts) throws Exception { + List<String> wordsArr = new ArrayList<>(); + + //Generating + for (int i = 0; i < wordCounts.length; i += 2) { + String word = (String) wordCounts[i]; + int cnt = (Integer) wordCounts[i + 1]; + + while (cnt-- > 0) + wordsArr.add(word); + } + + //Shuffling + for (int i = 0; i < wordsArr.size(); i++) { + int j = (int)(Math.random() * wordsArr.size()); + + Collections.swap(wordsArr, i, j); + } + + //Writing file + try (PrintWriter writer = new PrintWriter(path)) { + int j = 0; + + while (j < wordsArr.size()) { + int i = 5 + (int)(Math.random() * 5); + + List<String> subList = wordsArr.subList(j, Math.min(j + i, wordsArr.size())); + j += i; + + writer.println(Joiner.on(' ').join(subList)); + } + + writer.flush(); + } + } + + /** + * Generates two data files to join its with Hive. + * + * @throws FileNotFoundException If failed. + */ + private void generateHiveTestFiles() throws FileNotFoundException { + try (PrintWriter writerA = new PrintWriter(new File(testWorkDir, "data-a")); + PrintWriter writerB = new PrintWriter(new File(testWorkDir, "data-b"))) { + char sep = '\t'; + + int idB = 0; + int idA = 0; + int v = 1000; + + for (int i = 0; i < 1000; i++) { + writerA.print(idA++); + writerA.print(sep); + writerA.println(idB); + + writerB.print(idB++); + writerB.print(sep); + writerB.println(v += 2); + + writerB.print(idB++); + writerB.print(sep); + writerB.println(v += 2); + } + + writerA.flush(); + writerB.flush(); + } + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + hiveHome = IgniteSystemProperties.getString("HIVE_HOME"); + + assertFalse("HIVE_HOME hasn't been set.", F.isEmpty(hiveHome)); + + hadoopHome = IgniteSystemProperties.getString("HADOOP_HOME"); + + assertFalse("HADOOP_HOME hasn't been set.", F.isEmpty(hadoopHome)); + + String mapredHome = hadoopHome + "/share/hadoop/mapreduce"; + + File[] fileList = new File(mapredHome).listFiles(new FileFilter() { + @Override public boolean accept(File pathname) { + return pathname.getName().startsWith("hadoop-mapreduce-examples-") && + pathname.getName().endsWith(".jar"); + } + }); + + assertEquals("Invalid hadoop distribution.", 1, fileList.length); + + examplesJar = fileList[0]; + + testWorkDir = Files.createTempDirectory("hadoop-cli-test").toFile(); + + U.copy(U.resolveIgnitePath("docs/core-site.ignite.xml"), new File(testWorkDir, "core-site.xml"), false); + + File srcFile = U.resolveIgnitePath("docs/mapred-site.ignite.xml"); + File dstFile = new File(testWorkDir, "mapred-site.xml"); + + try (BufferedReader in = new BufferedReader(new FileReader(srcFile)); + PrintWriter out = new PrintWriter(dstFile)) { + String line; + + while ((line = in.readLine()) != null) { + if (line.startsWith("</configuration>")) + out.println( + " <property>\n" + + " <name>" + HadoopUtils.JOB_COUNTER_WRITER_PROPERTY + "</name>\n" + + " <value>" + IgniteHadoopFileSystemCounterWriter.class.getName() + "</value>\n" + + " </property>\n"); + + out.println(line); + } + + out.flush(); + } + + generateTestFile(new File(testWorkDir, "test-data"), "red", 100, "green", 200, "blue", 150, "yellow", 50); + + generateHiveTestFiles(); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + super.afterTestsStopped(); + + U.delete(testWorkDir); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + igfs = (IgfsEx) Ignition.start("config/hadoop/default-config.xml").fileSystem(igfsName); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(true); + } + + /** + * Creates the process build with appropriate environment to run Hadoop CLI. + * + * @return Process builder. + */ + private ProcessBuilder createProcessBuilder() { + String sep = ":"; + + String ggClsPath = HadoopJob.class.getProtectionDomain().getCodeSource().getLocation().getPath() + sep + + HadoopJobTracker.class.getProtectionDomain().getCodeSource().getLocation().getPath() + sep + + ConcurrentHashMap8.class.getProtectionDomain().getCodeSource().getLocation().getPath(); + + ProcessBuilder res = new ProcessBuilder(); + + res.environment().put("HADOOP_HOME", hadoopHome); + res.environment().put("HADOOP_CLASSPATH", ggClsPath); + res.environment().put("HADOOP_CONF_DIR", testWorkDir.getAbsolutePath()); + + res.redirectErrorStream(true); + + return res; + } + + /** + * Waits for process exit and prints the its output. + * + * @param proc Process. + * @return Exit code. + * @throws Exception If failed. + */ + private int watchProcess(Process proc) throws Exception { + BufferedReader reader = new BufferedReader(new InputStreamReader(proc.getInputStream())); + + String line; + + while ((line = reader.readLine()) != null) + log().info(line); + + return proc.waitFor(); + } + + /** + * Executes Hadoop command line tool. + * + * @param args Arguments for Hadoop command line tool. + * @return Process exit code. + * @throws Exception If failed. + */ + private int executeHadoopCmd(String... args) throws Exception { + ProcessBuilder procBuilder = createProcessBuilder(); + + List<String> cmd = new ArrayList<>(); + + cmd.add(hadoopHome + "/bin/hadoop"); + cmd.addAll(Arrays.asList(args)); + + procBuilder.command(cmd); + + log().info("Execute: " + procBuilder.command()); + + return watchProcess(procBuilder.start()); + } + + /** + * Executes Hive query. + * + * @param qry Query. + * @return Process exit code. + * @throws Exception If failed. + */ + private int executeHiveQuery(String qry) throws Exception { + ProcessBuilder procBuilder = createProcessBuilder(); + + List<String> cmd = new ArrayList<>(); + + procBuilder.command(cmd); + + cmd.add(hiveHome + "/bin/hive"); + + cmd.add("--hiveconf"); + cmd.add("hive.rpc.query.plan=true"); + + cmd.add("--hiveconf"); + cmd.add("javax.jdo.option.ConnectionURL=jdbc:derby:" + testWorkDir.getAbsolutePath() + "/metastore_db;" + + "databaseName=metastore_db;create=true"); + + cmd.add("-e"); + cmd.add(qry); + + procBuilder.command(cmd); + + log().info("Execute: " + procBuilder.command()); + + return watchProcess(procBuilder.start()); + } + + /** + * Tests Hadoop command line integration. + */ + public void testHadoopCommandLine() throws Exception { + assertEquals(0, executeHadoopCmd("fs", "-ls", "/")); + + assertEquals(0, executeHadoopCmd("fs", "-mkdir", "/input")); + + assertEquals(0, executeHadoopCmd("fs", "-put", new File(testWorkDir, "test-data").getAbsolutePath(), "/input")); + + assertTrue(igfs.exists(new IgfsPath("/input/test-data"))); + + assertEquals(0, executeHadoopCmd("jar", examplesJar.getAbsolutePath(), "wordcount", "/input", "/output")); + + IgfsPath path = new IgfsPath("/user/" + System.getProperty("user.name") + "/"); + + assertTrue(igfs.exists(path)); + + IgfsPath jobStatPath = null; + + for (IgfsPath jobPath : igfs.listPaths(path)) { + assertNull(jobStatPath); + + jobStatPath = jobPath; + } + + File locStatFile = new File(testWorkDir, "performance"); + + assertEquals(0, executeHadoopCmd("fs", "-get", jobStatPath.toString() + "/performance", locStatFile.toString())); + + long evtCnt = HadoopTestUtils.simpleCheckJobStatFile(new BufferedReader(new FileReader(locStatFile))); + + assertTrue(evtCnt >= 22); //It's the minimum amount of events for job with combiner. + + assertTrue(igfs.exists(new IgfsPath("/output"))); + + BufferedReader in = new BufferedReader(new InputStreamReader(igfs.open(new IgfsPath("/output/part-r-00000")))); + + List<String> res = new ArrayList<>(); + + String line; + + while ((line = in.readLine()) != null) + res.add(line); + + Collections.sort(res); + + assertEquals("[blue\t150, green\t200, red\t100, yellow\t50]", res.toString()); + } + + /** + * Runs query check result. + * + * @param expRes Expected result. + * @param qry Query. + * @throws Exception If failed. + */ + private void checkQuery(String expRes, String qry) throws Exception { + assertEquals(0, executeHiveQuery("drop table if exists result")); + + assertEquals(0, executeHiveQuery( + "create table result " + + "row format delimited fields terminated by ' ' " + + "stored as textfile " + + "location '/result' as " + qry + )); + + IgfsInputStreamAdapter in = igfs.open(new IgfsPath("/result/000000_0")); + + byte[] buf = new byte[(int) in.length()]; + + in.read(buf); + + assertEquals(expRes, new String(buf)); + } + + /** + * Tests Hive integration. + */ + public void testHiveCommandLine() throws Exception { + assertEquals(0, executeHiveQuery( + "create table table_a (" + + "id_a int," + + "id_b int" + + ") " + + "row format delimited fields terminated by '\\t'" + + "stored as textfile " + + "location '/table-a'" + )); + + assertEquals(0, executeHadoopCmd("fs", "-put", new File(testWorkDir, "data-a").getAbsolutePath(), "/table-a")); + + assertEquals(0, executeHiveQuery( + "create table table_b (" + + "id_b int," + + "rndv int" + + ") " + + "row format delimited fields terminated by '\\t'" + + "stored as textfile " + + "location '/table-b'" + )); + + assertEquals(0, executeHadoopCmd("fs", "-put", new File(testWorkDir, "data-b").getAbsolutePath(), "/table-b")); + + checkQuery( + "0 0\n" + + "1 2\n" + + "2 4\n" + + "3 6\n" + + "4 8\n" + + "5 10\n" + + "6 12\n" + + "7 14\n" + + "8 16\n" + + "9 18\n", + "select * from table_a order by id_a limit 10" + ); + + checkQuery("2000\n", "select count(id_b) from table_b"); + + checkQuery( + "250 500 2002\n" + + "251 502 2006\n" + + "252 504 2010\n" + + "253 506 2014\n" + + "254 508 2018\n" + + "255 510 2022\n" + + "256 512 2026\n" + + "257 514 2030\n" + + "258 516 2034\n" + + "259 518 2038\n", + "select a.id_a, a.id_b, b.rndv" + + " from table_a a" + + " inner join table_b b on a.id_b = b.id_b" + + " where b.rndv > 2000" + + " order by a.id_a limit 10" + ); + + checkQuery("1000\n", "select count(b.id_b) from table_a a inner join table_b b on a.id_b = b.id_b"); + } +}