http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractWordCountTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractWordCountTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractWordCountTest.java new file mode 100644 index 0000000..1390982 --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractWordCountTest.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import com.google.common.base.*; +import org.apache.hadoop.conf.*; +import org.apache.hadoop.fs.FileSystem; +import org.apache.ignite.igfs.*; +import org.apache.ignite.internal.processors.igfs.*; + +import java.io.*; +import java.util.*; + +/** + * Abstract class for tests based on WordCount test job. + */ +public abstract class HadoopAbstractWordCountTest extends HadoopAbstractSelfTest { + /** Input path. */ + protected static final String PATH_INPUT = "/input"; + + /** Output path. */ + protected static final String PATH_OUTPUT = "/output"; + + /** IGFS instance. */ + protected IgfsEx igfs; + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + Configuration cfg = new Configuration(); + + setupFileSystems(cfg); + + // Init cache by correct LocalFileSystem implementation + FileSystem.getLocal(cfg); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + igfs = (IgfsEx)startGrids(gridCount()).fileSystem(igfsName); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(true); + } + + /** {@inheritDoc} */ + @Override protected boolean igfsEnabled() { + return true; + } + + /** {@inheritDoc} */ + @Override protected int gridCount() { + return 1; + } + + /** + * Generates test file. + * + * @param path File name. + * @param wordCounts Words and counts. + * @throws Exception If failed. + */ + protected void generateTestFile(String path, Object... wordCounts) throws Exception { + List<String> wordsArr = new ArrayList<>(); + + //Generating + for (int i = 0; i < wordCounts.length; i += 2) { + String word = (String) wordCounts[i]; + int cnt = (Integer) wordCounts[i + 1]; + + while (cnt-- > 0) + wordsArr.add(word); + } + + //Shuffling + for (int i = 0; i < wordsArr.size(); i++) { + int j = (int)(Math.random() * wordsArr.size()); + + Collections.swap(wordsArr, i, j); + } + + //Input file preparing + PrintWriter testInputFileWriter = new PrintWriter(igfs.create(new IgfsPath(path), true)); + + int j = 0; + + while (j < wordsArr.size()) { + int i = 5 + (int)(Math.random() * 5); + + List<String> subList = wordsArr.subList(j, Math.min(j + i, wordsArr.size())); + j += i; + + testInputFileWriter.println(Joiner.on(' ').join(subList)); + } + + testInputFileWriter.close(); + } + + /** + * Reads whole text file into String. + * + * @param fileName Name of the file to read. + * @return Content of the file as String value. + * @throws Exception If could not read the file. + */ + protected String readAndSortFile(String fileName) throws Exception { + BufferedReader reader = new BufferedReader(new InputStreamReader(igfs.open(new IgfsPath(fileName)))); + + List<String> list = new ArrayList<>(); + + String line; + + while ((line = reader.readLine()) != null) + list.add(line); + + Collections.sort(list); + + return Joiner.on('\n').join(list) + "\n"; + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopCommandLineTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopCommandLineTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopCommandLineTest.java new file mode 100644 index 0000000..733ed01 --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopCommandLineTest.java @@ -0,0 +1,440 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import com.google.common.base.*; +import org.apache.ignite.*; +import org.apache.ignite.hadoop.fs.*; +import org.apache.ignite.igfs.*; +import org.apache.ignite.internal.processors.igfs.*; +import org.apache.ignite.internal.processors.hadoop.jobtracker.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.testframework.junits.common.*; +import org.jdk8.backport.*; + +import java.io.*; +import java.nio.file.*; +import java.util.*; + +/** + * Test of integration with Hadoop client via command line interface. + */ +public class HadoopCommandLineTest extends GridCommonAbstractTest { + /** IGFS instance. */ + private IgfsEx igfs; + + /** */ + private static final String igfsName = "igfs"; + + /** */ + private static File testWorkDir; + + /** */ + private static String hadoopHome; + + /** */ + private static String hiveHome; + + /** */ + private static File examplesJar; + + /** + * + * @param path File name. + * @param wordCounts Words and counts. + * @throws Exception If failed. + */ + private void generateTestFile(File path, Object... wordCounts) throws Exception { + List<String> wordsArr = new ArrayList<>(); + + //Generating + for (int i = 0; i < wordCounts.length; i += 2) { + String word = (String) wordCounts[i]; + int cnt = (Integer) wordCounts[i + 1]; + + while (cnt-- > 0) + wordsArr.add(word); + } + + //Shuffling + for (int i = 0; i < wordsArr.size(); i++) { + int j = (int)(Math.random() * wordsArr.size()); + + Collections.swap(wordsArr, i, j); + } + + //Writing file + try (PrintWriter writer = new PrintWriter(path)) { + int j = 0; + + while (j < wordsArr.size()) { + int i = 5 + (int)(Math.random() * 5); + + List<String> subList = wordsArr.subList(j, Math.min(j + i, wordsArr.size())); + j += i; + + writer.println(Joiner.on(' ').join(subList)); + } + + writer.flush(); + } + } + + /** + * Generates two data files to join its with Hive. + * + * @throws FileNotFoundException If failed. + */ + private void generateHiveTestFiles() throws FileNotFoundException { + try (PrintWriter writerA = new PrintWriter(new File(testWorkDir, "data-a")); + PrintWriter writerB = new PrintWriter(new File(testWorkDir, "data-b"))) { + char sep = '\t'; + + int idB = 0; + int idA = 0; + int v = 1000; + + for (int i = 0; i < 1000; i++) { + writerA.print(idA++); + writerA.print(sep); + writerA.println(idB); + + writerB.print(idB++); + writerB.print(sep); + writerB.println(v += 2); + + writerB.print(idB++); + writerB.print(sep); + writerB.println(v += 2); + } + + writerA.flush(); + writerB.flush(); + } + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + hiveHome = IgniteSystemProperties.getString("HIVE_HOME"); + + assertFalse("HIVE_HOME hasn't been set.", F.isEmpty(hiveHome)); + + hadoopHome = IgniteSystemProperties.getString("HADOOP_HOME"); + + assertFalse("HADOOP_HOME hasn't been set.", F.isEmpty(hadoopHome)); + + String mapredHome = hadoopHome + "/share/hadoop/mapreduce"; + + File[] fileList = new File(mapredHome).listFiles(new FileFilter() { + @Override public boolean accept(File pathname) { + return pathname.getName().startsWith("hadoop-mapreduce-examples-") && + pathname.getName().endsWith(".jar"); + } + }); + + assertEquals("Invalid hadoop distribution.", 1, fileList.length); + + examplesJar = fileList[0]; + + testWorkDir = Files.createTempDirectory("hadoop-cli-test").toFile(); + + U.copy(U.resolveIgnitePath("docs/core-site.ignite.xml"), new File(testWorkDir, "core-site.xml"), false); + + File srcFile = U.resolveIgnitePath("docs/mapred-site.ignite.xml"); + File dstFile = new File(testWorkDir, "mapred-site.xml"); + + try (BufferedReader in = new BufferedReader(new FileReader(srcFile)); + PrintWriter out = new PrintWriter(dstFile)) { + String line; + + while ((line = in.readLine()) != null) { + if (line.startsWith("</configuration>")) + out.println( + " <property>\n" + + " <name>" + HadoopUtils.JOB_COUNTER_WRITER_PROPERTY + "</name>\n" + + " <value>" + IgniteHadoopFileSystemCounterWriter.class.getName() + "</value>\n" + + " </property>\n"); + + out.println(line); + } + + out.flush(); + } + + generateTestFile(new File(testWorkDir, "test-data"), "red", 100, "green", 200, "blue", 150, "yellow", 50); + + generateHiveTestFiles(); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + super.afterTestsStopped(); + + U.delete(testWorkDir); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + igfs = (IgfsEx) Ignition.start("config/hadoop/default-config.xml").fileSystem(igfsName); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(true); + } + + /** + * Creates the process build with appropriate environment to run Hadoop CLI. + * + * @return Process builder. + */ + private ProcessBuilder createProcessBuilder() { + String sep = ":"; + + String ggClsPath = HadoopJob.class.getProtectionDomain().getCodeSource().getLocation().getPath() + sep + + HadoopJobTracker.class.getProtectionDomain().getCodeSource().getLocation().getPath() + sep + + ConcurrentHashMap8.class.getProtectionDomain().getCodeSource().getLocation().getPath(); + + ProcessBuilder res = new ProcessBuilder(); + + res.environment().put("HADOOP_HOME", hadoopHome); + res.environment().put("HADOOP_CLASSPATH", ggClsPath); + res.environment().put("HADOOP_CONF_DIR", testWorkDir.getAbsolutePath()); + + res.redirectErrorStream(true); + + return res; + } + + /** + * Waits for process exit and prints the its output. + * + * @param proc Process. + * @return Exit code. + * @throws Exception If failed. + */ + private int watchProcess(Process proc) throws Exception { + BufferedReader reader = new BufferedReader(new InputStreamReader(proc.getInputStream())); + + String line; + + while ((line = reader.readLine()) != null) + log().info(line); + + return proc.waitFor(); + } + + /** + * Executes Hadoop command line tool. + * + * @param args Arguments for Hadoop command line tool. + * @return Process exit code. + * @throws Exception If failed. + */ + private int executeHadoopCmd(String... args) throws Exception { + ProcessBuilder procBuilder = createProcessBuilder(); + + List<String> cmd = new ArrayList<>(); + + cmd.add(hadoopHome + "/bin/hadoop"); + cmd.addAll(Arrays.asList(args)); + + procBuilder.command(cmd); + + log().info("Execute: " + procBuilder.command()); + + return watchProcess(procBuilder.start()); + } + + /** + * Executes Hive query. + * + * @param qry Query. + * @return Process exit code. + * @throws Exception If failed. + */ + private int executeHiveQuery(String qry) throws Exception { + ProcessBuilder procBuilder = createProcessBuilder(); + + List<String> cmd = new ArrayList<>(); + + procBuilder.command(cmd); + + cmd.add(hiveHome + "/bin/hive"); + + cmd.add("--hiveconf"); + cmd.add("hive.rpc.query.plan=true"); + + cmd.add("--hiveconf"); + cmd.add("javax.jdo.option.ConnectionURL=jdbc:derby:" + testWorkDir.getAbsolutePath() + "/metastore_db;" + + "databaseName=metastore_db;create=true"); + + cmd.add("-e"); + cmd.add(qry); + + procBuilder.command(cmd); + + log().info("Execute: " + procBuilder.command()); + + return watchProcess(procBuilder.start()); + } + + /** + * Tests Hadoop command line integration. + */ + public void testHadoopCommandLine() throws Exception { + assertEquals(0, executeHadoopCmd("fs", "-ls", "/")); + + assertEquals(0, executeHadoopCmd("fs", "-mkdir", "/input")); + + assertEquals(0, executeHadoopCmd("fs", "-put", new File(testWorkDir, "test-data").getAbsolutePath(), "/input")); + + assertTrue(igfs.exists(new IgfsPath("/input/test-data"))); + + assertEquals(0, executeHadoopCmd("jar", examplesJar.getAbsolutePath(), "wordcount", "/input", "/output")); + + IgfsPath path = new IgfsPath("/user/" + System.getProperty("user.name") + "/"); + + assertTrue(igfs.exists(path)); + + IgfsPath jobStatPath = null; + + for (IgfsPath jobPath : igfs.listPaths(path)) { + assertNull(jobStatPath); + + jobStatPath = jobPath; + } + + File locStatFile = new File(testWorkDir, "performance"); + + assertEquals(0, executeHadoopCmd("fs", "-get", jobStatPath.toString() + "/performance", locStatFile.toString())); + + long evtCnt = GridHadoopTestUtils.simpleCheckJobStatFile(new BufferedReader(new FileReader(locStatFile))); + + assertTrue(evtCnt >= 22); //It's the minimum amount of events for job with combiner. + + assertTrue(igfs.exists(new IgfsPath("/output"))); + + BufferedReader in = new BufferedReader(new InputStreamReader(igfs.open(new IgfsPath("/output/part-r-00000")))); + + List<String> res = new ArrayList<>(); + + String line; + + while ((line = in.readLine()) != null) + res.add(line); + + Collections.sort(res); + + assertEquals("[blue\t150, green\t200, red\t100, yellow\t50]", res.toString()); + } + + /** + * Runs query check result. + * + * @param expRes Expected result. + * @param qry Query. + * @throws Exception If failed. + */ + private void checkQuery(String expRes, String qry) throws Exception { + assertEquals(0, executeHiveQuery("drop table if exists result")); + + assertEquals(0, executeHiveQuery( + "create table result " + + "row format delimited fields terminated by ' ' " + + "stored as textfile " + + "location '/result' as " + qry + )); + + IgfsInputStreamAdapter in = igfs.open(new IgfsPath("/result/000000_0")); + + byte[] buf = new byte[(int) in.length()]; + + in.read(buf); + + assertEquals(expRes, new String(buf)); + } + + /** + * Tests Hive integration. + */ + public void testHiveCommandLine() throws Exception { + assertEquals(0, executeHiveQuery( + "create table table_a (" + + "id_a int," + + "id_b int" + + ") " + + "row format delimited fields terminated by '\\t'" + + "stored as textfile " + + "location '/table-a'" + )); + + assertEquals(0, executeHadoopCmd("fs", "-put", new File(testWorkDir, "data-a").getAbsolutePath(), "/table-a")); + + assertEquals(0, executeHiveQuery( + "create table table_b (" + + "id_b int," + + "rndv int" + + ") " + + "row format delimited fields terminated by '\\t'" + + "stored as textfile " + + "location '/table-b'" + )); + + assertEquals(0, executeHadoopCmd("fs", "-put", new File(testWorkDir, "data-b").getAbsolutePath(), "/table-b")); + + checkQuery( + "0 0\n" + + "1 2\n" + + "2 4\n" + + "3 6\n" + + "4 8\n" + + "5 10\n" + + "6 12\n" + + "7 14\n" + + "8 16\n" + + "9 18\n", + "select * from table_a order by id_a limit 10" + ); + + checkQuery("2000\n", "select count(id_b) from table_b"); + + checkQuery( + "250 500 2002\n" + + "251 502 2006\n" + + "252 504 2010\n" + + "253 506 2014\n" + + "254 508 2018\n" + + "255 510 2022\n" + + "256 512 2026\n" + + "257 514 2030\n" + + "258 516 2034\n" + + "259 518 2038\n", + "select a.id_a, a.id_b, b.rndv" + + " from table_a a" + + " inner join table_b b on a.id_b = b.id_b" + + " where b.rndv > 2000" + + " order by a.id_a limit 10" + ); + + checkQuery("1000\n", "select count(b.id_b) from table_a a inner join table_b b on a.id_b = b.id_b"); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java index 2e2b5cb..e072592 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java @@ -39,7 +39,7 @@ import java.util.*; /** * */ -public class HadoopDefaultMapReducePlannerSelfTest extends GridHadoopAbstractSelfTest { +public class HadoopDefaultMapReducePlannerSelfTest extends HadoopAbstractSelfTest { /** */ private static final UUID ID_1 = new UUID(0, 1); @@ -104,9 +104,9 @@ public class HadoopDefaultMapReducePlannerSelfTest extends GridHadoopAbstractSel * @throws IgniteCheckedException If failed. */ public void testIgfsOneBlockPerNode() throws IgniteCheckedException { - GridHadoopFileBlock split1 = split(true, "/file1", 0, 100, HOST_1); - GridHadoopFileBlock split2 = split(true, "/file2", 0, 100, HOST_2); - GridHadoopFileBlock split3 = split(true, "/file3", 0, 100, HOST_3); + HadoopFileBlock split1 = split(true, "/file1", 0, 100, HOST_1); + HadoopFileBlock split2 = split(true, "/file2", 0, 100, HOST_2); + HadoopFileBlock split3 = split(true, "/file3", 0, 100, HOST_3); mapIgfsBlock(split1.file(), 0, 100, location(0, 100, ID_1)); mapIgfsBlock(split2.file(), 0, 100, location(0, 100, ID_2)); @@ -164,9 +164,9 @@ public class HadoopDefaultMapReducePlannerSelfTest extends GridHadoopAbstractSel * @throws IgniteCheckedException If failed. */ public void testNonIgfsOneBlockPerNode() throws IgniteCheckedException { - GridHadoopFileBlock split1 = split(false, "/file1", 0, 100, HOST_1); - GridHadoopFileBlock split2 = split(false, "/file2", 0, 100, HOST_2); - GridHadoopFileBlock split3 = split(false, "/file3", 0, 100, HOST_3); + HadoopFileBlock split1 = split(false, "/file1", 0, 100, HOST_1); + HadoopFileBlock split2 = split(false, "/file2", 0, 100, HOST_2); + HadoopFileBlock split3 = split(false, "/file3", 0, 100, HOST_3); plan(1, split1); assert ensureMappers(ID_1, split1); @@ -220,9 +220,9 @@ public class HadoopDefaultMapReducePlannerSelfTest extends GridHadoopAbstractSel * @throws IgniteCheckedException If failed. */ public void testIgfsSeveralBlocksPerNode() throws IgniteCheckedException { - GridHadoopFileBlock split1 = split(true, "/file1", 0, 100, HOST_1, HOST_2); - GridHadoopFileBlock split2 = split(true, "/file2", 0, 100, HOST_1, HOST_2); - GridHadoopFileBlock split3 = split(true, "/file3", 0, 100, HOST_1, HOST_3); + HadoopFileBlock split1 = split(true, "/file1", 0, 100, HOST_1, HOST_2); + HadoopFileBlock split2 = split(true, "/file2", 0, 100, HOST_1, HOST_2); + HadoopFileBlock split3 = split(true, "/file3", 0, 100, HOST_1, HOST_3); mapIgfsBlock(split1.file(), 0, 100, location(0, 100, ID_1, ID_2)); mapIgfsBlock(split2.file(), 0, 100, location(0, 100, ID_1, ID_2)); @@ -266,9 +266,9 @@ public class HadoopDefaultMapReducePlannerSelfTest extends GridHadoopAbstractSel * @throws IgniteCheckedException If failed. */ public void testNonIgfsSeveralBlocksPerNode() throws IgniteCheckedException { - GridHadoopFileBlock split1 = split(false, "/file1", 0, 100, HOST_1, HOST_2); - GridHadoopFileBlock split2 = split(false, "/file2", 0, 100, HOST_1, HOST_2); - GridHadoopFileBlock split3 = split(false, "/file3", 0, 100, HOST_1, HOST_3); + HadoopFileBlock split1 = split(false, "/file1", 0, 100, HOST_1, HOST_2); + HadoopFileBlock split2 = split(false, "/file2", 0, 100, HOST_1, HOST_2); + HadoopFileBlock split3 = split(false, "/file3", 0, 100, HOST_1, HOST_3); plan(1, split1); assert ensureMappers(ID_1, split1) && ensureReducers(ID_1, 1) && ensureEmpty(ID_2) || @@ -308,8 +308,8 @@ public class HadoopDefaultMapReducePlannerSelfTest extends GridHadoopAbstractSel * @throws IgniteCheckedException If failed. */ public void testIgfsSeveralComplexBlocksPerNode() throws IgniteCheckedException { - GridHadoopFileBlock split1 = split(true, "/file1", 0, 100, HOST_1, HOST_2, HOST_3); - GridHadoopFileBlock split2 = split(true, "/file2", 0, 100, HOST_1, HOST_2, HOST_3); + HadoopFileBlock split1 = split(true, "/file1", 0, 100, HOST_1, HOST_2, HOST_3); + HadoopFileBlock split2 = split(true, "/file2", 0, 100, HOST_1, HOST_2, HOST_3); mapIgfsBlock(split1.file(), 0, 100, location(0, 50, ID_1, ID_2), location(51, 100, ID_1, ID_3)); mapIgfsBlock(split2.file(), 0, 100, location(0, 50, ID_1, ID_2), location(51, 100, ID_2, ID_3)); @@ -344,9 +344,9 @@ public class HadoopDefaultMapReducePlannerSelfTest extends GridHadoopAbstractSel * @throws IgniteCheckedException If failed. */ public void testNonIgfsOrphans() throws IgniteCheckedException { - GridHadoopFileBlock split1 = split(false, "/file1", 0, 100, INVALID_HOST_1, INVALID_HOST_2); - GridHadoopFileBlock split2 = split(false, "/file2", 0, 100, INVALID_HOST_1, INVALID_HOST_3); - GridHadoopFileBlock split3 = split(false, "/file3", 0, 100, INVALID_HOST_2, INVALID_HOST_3); + HadoopFileBlock split1 = split(false, "/file1", 0, 100, INVALID_HOST_1, INVALID_HOST_2); + HadoopFileBlock split2 = split(false, "/file2", 0, 100, INVALID_HOST_1, INVALID_HOST_3); + HadoopFileBlock split3 = split(false, "/file3", 0, 100, INVALID_HOST_2, INVALID_HOST_3); plan(1, split1); assert ensureMappers(ID_1, split1) && ensureReducers(ID_1, 1) && ensureEmpty(ID_2) && ensureEmpty(ID_3) || @@ -400,11 +400,11 @@ public class HadoopDefaultMapReducePlannerSelfTest extends GridHadoopAbstractSel * @return Plan. * @throws IgniteCheckedException If failed. */ - private static GridHadoopMapReducePlan plan(int reducers, GridHadoopInputSplit... splits) throws IgniteCheckedException { + private static GridHadoopMapReducePlan plan(int reducers, HadoopInputSplit... splits) throws IgniteCheckedException { assert reducers > 0; assert splits != null && splits.length > 0; - Collection<GridHadoopInputSplit> splitList = new ArrayList<>(splits.length); + Collection<HadoopInputSplit> splitList = new ArrayList<>(splits.length); Collections.addAll(splitList, splits); @@ -436,12 +436,12 @@ public class HadoopDefaultMapReducePlannerSelfTest extends GridHadoopAbstractSel * @param expSplits Expected splits. * @return {@code True} if this assumption is valid. */ - private static boolean ensureMappers(UUID nodeId, GridHadoopInputSplit... expSplits) { - Collection<GridHadoopInputSplit> expSplitsCol = new ArrayList<>(); + private static boolean ensureMappers(UUID nodeId, HadoopInputSplit... expSplits) { + Collection<HadoopInputSplit> expSplitsCol = new ArrayList<>(); Collections.addAll(expSplitsCol, expSplits); - Collection<GridHadoopInputSplit> splits = PLAN.get().mappers(nodeId); + Collection<HadoopInputSplit> splits = PLAN.get().mappers(nodeId); return F.eq(expSplitsCol, splits); } @@ -479,10 +479,10 @@ public class HadoopDefaultMapReducePlannerSelfTest extends GridHadoopAbstractSel * @param hosts Hosts. * @return Split. */ - private static GridHadoopFileBlock split(boolean igfs, String file, long start, long len, String... hosts) { + private static HadoopFileBlock split(boolean igfs, String file, long start, long len, String... hosts) { URI uri = URI.create((igfs ? "igfs://igfs@" : "hdfs://") + file); - return new GridHadoopFileBlock(hosts, uri, start, len); + return new HadoopFileBlock(hosts, uri, start, len); } /** @@ -586,12 +586,12 @@ public class HadoopDefaultMapReducePlannerSelfTest extends GridHadoopAbstractSel /** * Mocked job. */ - private static class MockJob implements GridHadoopJob { + private static class MockJob implements HadoopJob { /** Reducers count. */ private final int reducers; /** */ - private Collection<GridHadoopInputSplit> splitList; + private Collection<HadoopInputSplit> splitList; /** * Constructor. @@ -599,7 +599,7 @@ public class HadoopDefaultMapReducePlannerSelfTest extends GridHadoopAbstractSel * @param reducers Reducers count. * @param splitList Splits. */ - private MockJob(int reducers, Collection<GridHadoopInputSplit> splitList) { + private MockJob(int reducers, Collection<HadoopInputSplit> splitList) { this.reducers = reducers; this.splitList = splitList; } @@ -619,12 +619,12 @@ public class HadoopDefaultMapReducePlannerSelfTest extends GridHadoopAbstractSel } /** {@inheritDoc} */ - @Override public Collection<GridHadoopInputSplit> input() throws IgniteCheckedException { + @Override public Collection<HadoopInputSplit> input() throws IgniteCheckedException { return splitList; } /** {@inheritDoc} */ - @Override public GridHadoopTaskContext getTaskContext(GridHadoopTaskInfo info) throws IgniteCheckedException { + @Override public HadoopTaskContext getTaskContext(GridHadoopTaskInfo info) throws IgniteCheckedException { return null; } @@ -936,7 +936,7 @@ public class HadoopDefaultMapReducePlannerSelfTest extends GridHadoopAbstractSel } /** {@inheritDoc} */ - @Override public GridHadoop hadoop() { + @Override public Hadoop hadoop() { return null; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopFileSystemsTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopFileSystemsTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopFileSystemsTest.java new file mode 100644 index 0000000..8cf31a2 --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopFileSystemsTest.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.hadoop.conf.*; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.*; +import org.apache.hadoop.mapreduce.*; +import org.apache.ignite.internal.processors.hadoop.fs.*; +import org.apache.ignite.testframework.*; + +import java.io.*; +import java.net.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +/** + * Test file systems for the working directory multi-threading support. + */ +public class HadoopFileSystemsTest extends HadoopAbstractSelfTest { + private static final int THREAD_COUNT = 3; + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + startGrids(gridCount()); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(true); + } + + /** {@inheritDoc} */ + @Override protected boolean igfsEnabled() { + return true; + } + + /** {@inheritDoc} */ + @Override protected int gridCount() { + return 1; + } + + + /** + * Test the file system with specified URI for the multi-thread working directory support. + * + * @param uri Base URI of the file system (scheme and authority). + * @throws Exception If fails. + */ + private void testFileSystem(final URI uri) throws Exception { + final Configuration cfg = new Configuration(); + + setupFileSystems(cfg); + + cfg.set(HadoopFileSystemsUtils.LOC_FS_WORK_DIR_PROP, + new Path(new Path(uri), "user/" + System.getProperty("user.name")).toString()); + + final CountDownLatch changeUserPhase = new CountDownLatch(THREAD_COUNT); + final CountDownLatch changeDirPhase = new CountDownLatch(THREAD_COUNT); + final CountDownLatch changeAbsDirPhase = new CountDownLatch(THREAD_COUNT); + final CountDownLatch finishPhase = new CountDownLatch(THREAD_COUNT); + + final Path[] newUserInitWorkDir = new Path[THREAD_COUNT]; + final Path[] newWorkDir = new Path[THREAD_COUNT]; + final Path[] newAbsWorkDir = new Path[THREAD_COUNT]; + final Path[] newInstanceWorkDir = new Path[THREAD_COUNT]; + + final AtomicInteger threadNum = new AtomicInteger(0); + + GridTestUtils.runMultiThreadedAsync(new Runnable() { + @Override public void run() { + try { + int curThreadNum = threadNum.getAndIncrement(); + + FileSystem fs = FileSystem.get(uri, cfg); + + HadoopFileSystemsUtils.setUser(fs, "user" + curThreadNum); + + if ("file".equals(uri.getScheme())) + FileSystem.get(uri, cfg).setWorkingDirectory(new Path("file:///user/user" + curThreadNum)); + + changeUserPhase.countDown(); + changeUserPhase.await(); + + newUserInitWorkDir[curThreadNum] = FileSystem.get(uri, cfg).getWorkingDirectory(); + + FileSystem.get(uri, cfg).setWorkingDirectory(new Path("folder" + curThreadNum)); + + changeDirPhase.countDown(); + changeDirPhase.await(); + + newWorkDir[curThreadNum] = FileSystem.get(uri, cfg).getWorkingDirectory(); + + FileSystem.get(uri, cfg).setWorkingDirectory(new Path("/folder" + curThreadNum)); + + changeAbsDirPhase.countDown(); + changeAbsDirPhase.await(); + + newAbsWorkDir[curThreadNum] = FileSystem.get(uri, cfg).getWorkingDirectory(); + + newInstanceWorkDir[curThreadNum] = FileSystem.newInstance(uri, cfg).getWorkingDirectory(); + + finishPhase.countDown(); + } + catch (InterruptedException | IOException e) { + error("Failed to execute test thread.", e); + + fail(); + } + } + }, THREAD_COUNT, "filesystems-test"); + + finishPhase.await(); + + for (int i = 0; i < THREAD_COUNT; i ++) { + cfg.set(MRJobConfig.USER_NAME, "user" + i); + + Path workDir = new Path(new Path(uri), "user/user" + i); + + cfg.set(HadoopFileSystemsUtils.LOC_FS_WORK_DIR_PROP, workDir.toString()); + + assertEquals(workDir, FileSystem.newInstance(uri, cfg).getWorkingDirectory()); + + assertEquals(workDir, newUserInitWorkDir[i]); + + assertEquals(new Path(new Path(uri), "user/user" + i + "/folder" + i), newWorkDir[i]); + + assertEquals(new Path("/folder" + i), newAbsWorkDir[i]); + + assertEquals(new Path(new Path(uri), "user/" + System.getProperty("user.name")), newInstanceWorkDir[i]); + } + + System.out.println(System.getProperty("user.dir")); + } + + /** + * Test IGFS multi-thread working directory. + * + * @throws Exception If fails. + */ + public void testIgfs() throws Exception { + testFileSystem(URI.create(igfsScheme())); + } + + /** + * Test HDFS multi-thread working directory. + * + * @throws Exception If fails. + */ + public void testHdfs() throws Exception { + testFileSystem(URI.create("hdfs://localhost/")); + } + + /** + * Test LocalFS multi-thread working directory. + * + * @throws Exception If fails. + */ + public void testLocal() throws Exception { + testFileSystem(URI.create("file:///")); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopGroupingTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopGroupingTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopGroupingTest.java new file mode 100644 index 0000000..a6c29e9 --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopGroupingTest.java @@ -0,0 +1,286 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.hadoop.io.*; +import org.apache.hadoop.mapreduce.*; +import org.apache.ignite.internal.util.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.io.*; +import java.util.*; + +import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*; + +/** + * Grouping test. + */ +public class HadoopGroupingTest extends HadoopAbstractSelfTest { + /** */ + private static final String PATH_OUTPUT = "/test-out"; + + /** */ + private static final GridConcurrentHashSet<UUID> vals = GridHadoopSharedMap.map(HadoopGroupingTest.class) + .put("vals", new GridConcurrentHashSet<UUID>()); + + /** {@inheritDoc} */ + @Override protected int gridCount() { + return 3; + } + + /** {@inheritDoc} */ + protected boolean igfsEnabled() { + return false; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + startGrids(gridCount()); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(true); + } + + /** {@inheritDoc} */ + @Override public GridHadoopConfiguration hadoopConfiguration(String gridName) { + GridHadoopConfiguration cfg = super.hadoopConfiguration(gridName); + + cfg.setExternalExecution(false); + + return cfg; + } + + /** + * @throws Exception If failed. + */ + public void testGroupingReducer() throws Exception { + doTestGrouping(false); + } + + /** + * @throws Exception If failed. + */ + public void testGroupingCombiner() throws Exception { + doTestGrouping(true); + } + + /** + * @param combiner With combiner. + * @throws Exception If failed. + */ + public void doTestGrouping(boolean combiner) throws Exception { + vals.clear(); + + Job job = Job.getInstance(); + + job.setInputFormatClass(InFormat.class); + job.setOutputFormatClass(OutFormat.class); + + job.setOutputKeyClass(YearTemperature.class); + job.setOutputValueClass(Text.class); + + job.setMapperClass(Mapper.class); + + if (combiner) { + job.setCombinerClass(MyReducer.class); + job.setNumReduceTasks(0); + job.setCombinerKeyGroupingComparatorClass(YearComparator.class); + } + else { + job.setReducerClass(MyReducer.class); + job.setNumReduceTasks(4); + job.setGroupingComparatorClass(YearComparator.class); + } + + grid(0).hadoop().submit(new GridHadoopJobId(UUID.randomUUID(), 2), + createJobInfo(job.getConfiguration())).get(30000); + + assertTrue(vals.isEmpty()); + } + + public static class MyReducer extends Reducer<YearTemperature, Text, Text, Object> { + /** */ + int lastYear; + + @Override protected void reduce(YearTemperature key, Iterable<Text> vals0, Context context) + throws IOException, InterruptedException { + X.println("___ : " + context.getTaskAttemptID() + " --> " + key); + + Set<UUID> ids = new HashSet<>(); + + for (Text val : vals0) + assertTrue(ids.add(UUID.fromString(val.toString()))); + + for (Text val : vals0) + assertTrue(ids.remove(UUID.fromString(val.toString()))); + + assertTrue(ids.isEmpty()); + + assertTrue(key.year > lastYear); + + lastYear = key.year; + + for (Text val : vals0) + assertTrue(vals.remove(UUID.fromString(val.toString()))); + } + } + + public static class YearComparator implements RawComparator<YearTemperature> { // Grouping comparator. + /** {@inheritDoc} */ + @Override public int compare(YearTemperature o1, YearTemperature o2) { + return Integer.compare(o1.year, o2.year); + } + + /** {@inheritDoc} */ + @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { + throw new IllegalStateException(); + } + } + + public static class YearTemperature implements WritableComparable<YearTemperature>, Cloneable { + /** */ + private int year; + + /** */ + private int temperature; + + /** {@inheritDoc} */ + @Override public void write(DataOutput out) throws IOException { + out.writeInt(year); + out.writeInt(temperature); + } + + /** {@inheritDoc} */ + @Override public void readFields(DataInput in) throws IOException { + year = in.readInt(); + temperature = in.readInt(); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + throw new IllegalStateException(); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { // To be partitioned by year. + return year; + } + + /** {@inheritDoc} */ + @Override public int compareTo(YearTemperature o) { + int res = Integer.compare(year, o.year); + + if (res != 0) + return res; + + // Sort comparator by year and temperature, to find max for year. + return Integer.compare(o.temperature, temperature); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(YearTemperature.class, this); + } + } + + public static class InFormat extends InputFormat<YearTemperature, Text> { + /** {@inheritDoc} */ + @Override public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException { + ArrayList<InputSplit> list = new ArrayList<>(); + + for (int i = 0; i < 10; i++) + list.add(new HadoopSortingTest.FakeSplit(20)); + + return list; + } + + /** {@inheritDoc} */ + @Override public RecordReader<YearTemperature, Text> createRecordReader(final InputSplit split, + TaskAttemptContext context) throws IOException, InterruptedException { + return new RecordReader<YearTemperature, Text>() { + /** */ + int cnt; + + /** */ + Random rnd = new GridRandom(); + + /** */ + YearTemperature key = new YearTemperature(); + + /** */ + Text val = new Text(); + + @Override public void initialize(InputSplit split, TaskAttemptContext context) { + // No-op. + } + + @Override public boolean nextKeyValue() throws IOException, InterruptedException { + return cnt++ < split.getLength(); + } + + @Override public YearTemperature getCurrentKey() { + key.year = 1990 + rnd.nextInt(10); + key.temperature = 10 + rnd.nextInt(20); + + return key; + } + + @Override public Text getCurrentValue() { + UUID id = UUID.randomUUID(); + + assertTrue(vals.add(id)); + + val.set(id.toString()); + + return val; + } + + @Override public float getProgress() { + return 0; + } + + @Override public void close() { + // No-op. + } + }; + } + } + + /** + * + */ + public static class OutFormat extends OutputFormat { + /** {@inheritDoc} */ + @Override public RecordWriter getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException { + return null; + } + + /** {@inheritDoc} */ + @Override public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException { + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopJobTrackerSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopJobTrackerSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopJobTrackerSelfTest.java new file mode 100644 index 0000000..ed6d0a0 --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopJobTrackerSelfTest.java @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.hadoop.fs.*; +import org.apache.hadoop.mapreduce.*; +import org.apache.hadoop.mapreduce.lib.input.*; +import org.apache.hadoop.mapreduce.lib.output.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.io.*; +import java.net.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*; + +/** + * Job tracker self test. + */ +public class HadoopJobTrackerSelfTest extends HadoopAbstractSelfTest { + /** */ + private static final String PATH_OUTPUT = "/test-out"; + + /** Test block count parameter name. */ + private static final int BLOCK_CNT = 10; + + /** */ + private static GridHadoopSharedMap m = GridHadoopSharedMap.map(HadoopJobTrackerSelfTest.class); + + /** Map task execution count. */ + private static final AtomicInteger mapExecCnt = m.put("mapExecCnt", new AtomicInteger()); + + /** Reduce task execution count. */ + private static final AtomicInteger reduceExecCnt = m.put("reduceExecCnt", new AtomicInteger()); + + /** Reduce task execution count. */ + private static final AtomicInteger combineExecCnt = m.put("combineExecCnt", new AtomicInteger()); + + /** */ + private static final Map<String, CountDownLatch> latch = m.put("latch", new HashMap<String, CountDownLatch>()); + + /** {@inheritDoc} */ + @Override protected boolean igfsEnabled() { + return true; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + startGrids(gridCount()); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + + super.afterTestsStopped(); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + latch.put("mapAwaitLatch", new CountDownLatch(1)); + latch.put("reduceAwaitLatch", new CountDownLatch(1)); + latch.put("combineAwaitLatch", new CountDownLatch(1)); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + mapExecCnt.set(0); + combineExecCnt.set(0); + reduceExecCnt.set(0); + } + + /** {@inheritDoc} */ + @Override public GridHadoopConfiguration hadoopConfiguration(String gridName) { + GridHadoopConfiguration cfg = super.hadoopConfiguration(gridName); + + cfg.setMapReducePlanner(new GridHadoopTestRoundRobinMrPlanner()); + cfg.setExternalExecution(false); + + return cfg; + } + + /** + * @throws Exception If failed. + */ + public void testSimpleTaskSubmit() throws Exception { + try { + UUID globalId = UUID.randomUUID(); + + Job job = Job.getInstance(); + setupFileSystems(job.getConfiguration()); + + job.setMapperClass(TestMapper.class); + job.setReducerClass(TestReducer.class); + job.setInputFormatClass(InFormat.class); + + FileOutputFormat.setOutputPath(job, new Path(igfsScheme() + PATH_OUTPUT + "1")); + + GridHadoopJobId jobId = new GridHadoopJobId(globalId, 1); + + grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration())); + + checkStatus(jobId, false); + + info("Releasing map latch."); + + latch.get("mapAwaitLatch").countDown(); + + checkStatus(jobId, false); + + info("Releasing reduce latch."); + + latch.get("reduceAwaitLatch").countDown(); + + checkStatus(jobId, true); + + assertEquals(10, mapExecCnt.get()); + assertEquals(0, combineExecCnt.get()); + assertEquals(1, reduceExecCnt.get()); + } + finally { + // Safety. + latch.get("mapAwaitLatch").countDown(); + latch.get("combineAwaitLatch").countDown(); + latch.get("reduceAwaitLatch").countDown(); + } + } + + /** + * @throws Exception If failed. + */ + public void testTaskWithCombinerPerMap() throws Exception { + try { + UUID globalId = UUID.randomUUID(); + + Job job = Job.getInstance(); + setupFileSystems(job.getConfiguration()); + + job.setMapperClass(TestMapper.class); + job.setReducerClass(TestReducer.class); + job.setCombinerClass(TestCombiner.class); + job.setInputFormatClass(InFormat.class); + + FileOutputFormat.setOutputPath(job, new Path(igfsScheme() + PATH_OUTPUT + "2")); + + GridHadoopJobId jobId = new GridHadoopJobId(globalId, 1); + + grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration())); + + checkStatus(jobId, false); + + info("Releasing map latch."); + + latch.get("mapAwaitLatch").countDown(); + + checkStatus(jobId, false); + + // All maps are completed. We have a combiner, so no reducers should be executed + // before combiner latch is released. + + U.sleep(50); + + assertEquals(0, reduceExecCnt.get()); + + info("Releasing combiner latch."); + + latch.get("combineAwaitLatch").countDown(); + + checkStatus(jobId, false); + + info("Releasing reduce latch."); + + latch.get("reduceAwaitLatch").countDown(); + + checkStatus(jobId, true); + + assertEquals(10, mapExecCnt.get()); + assertEquals(10, combineExecCnt.get()); + assertEquals(1, reduceExecCnt.get()); + } + finally { + // Safety. + latch.get("mapAwaitLatch").countDown(); + latch.get("combineAwaitLatch").countDown(); + latch.get("reduceAwaitLatch").countDown(); + } + } + + /** + * Checks job execution status. + * + * @param jobId Job ID. + * @param complete Completion status. + * @throws Exception If failed. + */ + private void checkStatus(GridHadoopJobId jobId, boolean complete) throws Exception { + for (int i = 0; i < gridCount(); i++) { + IgniteKernal kernal = (IgniteKernal)grid(i); + + Hadoop hadoop = kernal.hadoop(); + + GridHadoopJobStatus stat = hadoop.status(jobId); + + assert stat != null; + + IgniteInternalFuture<?> fut = hadoop.finishFuture(jobId); + + if (!complete) + assertFalse(fut.isDone()); + else { + info("Waiting for status future completion on node [idx=" + i + ", nodeId=" + + kernal.getLocalNodeId() + ']'); + + fut.get(); + } + } + } + + /** + * Test input format + */ + public static class InFormat extends InputFormat { + + @Override public List<InputSplit> getSplits(JobContext ctx) throws IOException, InterruptedException { + List<InputSplit> res = new ArrayList<>(BLOCK_CNT); + + for (int i = 0; i < BLOCK_CNT; i++) + try { + res.add(new FileSplit(new Path(new URI("someFile")), i, i + 1, new String[] {"localhost"})); + } + catch (URISyntaxException e) { + throw new IOException(e); + } + + return res; + } + + @Override public RecordReader createRecordReader(InputSplit split, TaskAttemptContext ctx) throws IOException, InterruptedException { + return new RecordReader() { + @Override public void initialize(InputSplit split, TaskAttemptContext ctx) { + } + + @Override public boolean nextKeyValue() { + return false; + } + + @Override public Object getCurrentKey() { + return null; + } + + @Override public Object getCurrentValue() { + return null; + } + + @Override public float getProgress() { + return 0; + } + + @Override public void close() { + + } + }; + } + } + + /** + * Test mapper. + */ + private static class TestMapper extends Mapper { + @Override public void run(Context ctx) throws IOException, InterruptedException { + System.out.println("Running task: " + ctx.getTaskAttemptID().getTaskID().getId()); + + latch.get("mapAwaitLatch").await(); + + mapExecCnt.incrementAndGet(); + + System.out.println("Completed task: " + ctx.getTaskAttemptID().getTaskID().getId()); + } + } + + /** + * Test reducer. + */ + private static class TestReducer extends Reducer { + @Override public void run(Context ctx) throws IOException, InterruptedException { + System.out.println("Running task: " + ctx.getTaskAttemptID().getTaskID().getId()); + + latch.get("reduceAwaitLatch").await(); + + reduceExecCnt.incrementAndGet(); + + System.out.println("Completed task: " + ctx.getTaskAttemptID().getTaskID().getId()); + } + } + + /** + * Test combiner. + */ + private static class TestCombiner extends Reducer { + @Override public void run(Context ctx) throws IOException, InterruptedException { + System.out.println("Running task: " + ctx.getTaskAttemptID().getTaskID().getId()); + + latch.get("combineAwaitLatch").await(); + + combineExecCnt.incrementAndGet(); + + System.out.println("Completed task: " + ctx.getTaskAttemptID().getTaskID().getId()); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceEmbeddedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceEmbeddedSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceEmbeddedSelfTest.java new file mode 100644 index 0000000..f86c608 --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceEmbeddedSelfTest.java @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.hadoop.conf.*; +import org.apache.hadoop.fs.*; +import org.apache.hadoop.io.*; +import org.apache.hadoop.io.serializer.*; +import org.apache.hadoop.mapred.*; +import org.apache.hadoop.mapreduce.*; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.ignite.igfs.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.hadoop.examples.*; + +import java.util.*; + +import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*; + +/** + * Tests map-reduce execution with embedded mode. + */ +public class HadoopMapReduceEmbeddedSelfTest extends HadoopMapReduceTest { + /** */ + private static Map<String, Boolean> flags = GridHadoopSharedMap.map(HadoopMapReduceEmbeddedSelfTest.class) + .put("flags", new HashMap<String, Boolean>()); + + /** {@inheritDoc} */ + @Override public GridHadoopConfiguration hadoopConfiguration(String gridName) { + GridHadoopConfiguration cfg = super.hadoopConfiguration(gridName); + + cfg.setExternalExecution(false); + + return cfg; + } + + /** + * Tests whole job execution with all phases in old and new versions of API with definition of custom + * Serialization, Partitioner and IO formats. + * @throws Exception If fails. + */ + public void testMultiReducerWholeMapReduceExecution() throws Exception { + IgfsPath inDir = new IgfsPath(PATH_INPUT); + + igfs.mkdirs(inDir); + + IgfsPath inFile = new IgfsPath(inDir, GridHadoopWordCount2.class.getSimpleName() + "-input"); + + generateTestFile(inFile.toString(), "key1", 10000, "key2", 20000, "key3", 15000, "key4", 7000, "key5", 12000, + "key6", 18000 ); + + for (int i = 0; i < 2; i++) { + boolean useNewAPI = i == 1; + + igfs.delete(new IgfsPath(PATH_OUTPUT), true); + + flags.put("serializationWasConfigured", false); + flags.put("partitionerWasConfigured", false); + flags.put("inputFormatWasConfigured", false); + flags.put("outputFormatWasConfigured", false); + + JobConf jobConf = new JobConf(); + + jobConf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, CustomSerialization.class.getName()); + + //To split into about 6-7 items for v2 + jobConf.setInt(FileInputFormat.SPLIT_MAXSIZE, 65000); + + //For v1 + jobConf.setInt("fs.local.block.size", 65000); + + // File system coordinates. + setupFileSystems(jobConf); + + GridHadoopWordCount1.setTasksClasses(jobConf, !useNewAPI, !useNewAPI, !useNewAPI); + + if (!useNewAPI) { + jobConf.setPartitionerClass(CustomV1Partitioner.class); + jobConf.setInputFormat(CustomV1InputFormat.class); + jobConf.setOutputFormat(CustomV1OutputFormat.class); + } + + Job job = Job.getInstance(jobConf); + + GridHadoopWordCount2.setTasksClasses(job, useNewAPI, useNewAPI, useNewAPI); + + if (useNewAPI) { + job.setPartitionerClass(CustomV2Partitioner.class); + job.setInputFormatClass(CustomV2InputFormat.class); + job.setOutputFormatClass(CustomV2OutputFormat.class); + } + + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(IntWritable.class); + + FileInputFormat.setInputPaths(job, new Path(igfsScheme() + inFile.toString())); + FileOutputFormat.setOutputPath(job, new Path(igfsScheme() + PATH_OUTPUT)); + + job.setNumReduceTasks(3); + + job.setJarByClass(GridHadoopWordCount2.class); + + IgniteInternalFuture<?> fut = grid(0).hadoop().submit(new GridHadoopJobId(UUID.randomUUID(), 1), + createJobInfo(job.getConfiguration())); + + fut.get(); + + assertTrue("Serialization was configured (new API is " + useNewAPI + ")", + flags.get("serializationWasConfigured")); + + assertTrue("Partitioner was configured (new API is = " + useNewAPI + ")", + flags.get("partitionerWasConfigured")); + + assertTrue("Input format was configured (new API is = " + useNewAPI + ")", + flags.get("inputFormatWasConfigured")); + + assertTrue("Output format was configured (new API is = " + useNewAPI + ")", + flags.get("outputFormatWasConfigured")); + + assertEquals("Use new API = " + useNewAPI, + "key3\t15000\n" + + "key6\t18000\n", + readAndSortFile(PATH_OUTPUT + "/" + (useNewAPI ? "part-r-" : "part-") + "00000") + ); + + assertEquals("Use new API = " + useNewAPI, + "key1\t10000\n" + + "key4\t7000\n", + readAndSortFile(PATH_OUTPUT + "/" + (useNewAPI ? "part-r-" : "part-") + "00001") + ); + + assertEquals("Use new API = " + useNewAPI, + "key2\t20000\n" + + "key5\t12000\n", + readAndSortFile(PATH_OUTPUT + "/" + (useNewAPI ? "part-r-" : "part-") + "00002") + ); + + } + } + + /** + * Custom serialization class that inherits behaviour of native {@link WritableSerialization}. + */ + protected static class CustomSerialization extends WritableSerialization { + @Override public void setConf(Configuration conf) { + super.setConf(conf); + + flags.put("serializationWasConfigured", true); + } + } + + /** + * Custom implementation of Partitioner in v1 API. + */ + private static class CustomV1Partitioner extends org.apache.hadoop.mapred.lib.HashPartitioner { + /** {@inheritDoc} */ + @Override public void configure(JobConf job) { + flags.put("partitionerWasConfigured", true); + } + } + + /** + * Custom implementation of Partitioner in v2 API. + */ + private static class CustomV2Partitioner extends org.apache.hadoop.mapreduce.lib.partition.HashPartitioner + implements Configurable { + /** {@inheritDoc} */ + @Override public void setConf(Configuration conf) { + flags.put("partitionerWasConfigured", true); + } + + /** {@inheritDoc} */ + @Override public Configuration getConf() { + return null; + } + } + + /** + * Custom implementation of InputFormat in v2 API. + */ + private static class CustomV2InputFormat extends org.apache.hadoop.mapreduce.lib.input.TextInputFormat implements Configurable { + /** {@inheritDoc} */ + @Override public void setConf(Configuration conf) { + flags.put("inputFormatWasConfigured", true); + } + + /** {@inheritDoc} */ + @Override public Configuration getConf() { + return null; + } + } + + /** + * Custom implementation of OutputFormat in v2 API. + */ + private static class CustomV2OutputFormat extends org.apache.hadoop.mapreduce.lib.output.TextOutputFormat implements Configurable { + /** {@inheritDoc} */ + @Override public void setConf(Configuration conf) { + flags.put("outputFormatWasConfigured", true); + } + + /** {@inheritDoc} */ + @Override public Configuration getConf() { + return null; + } + } + + /** + * Custom implementation of InputFormat in v1 API. + */ + private static class CustomV1InputFormat extends org.apache.hadoop.mapred.TextInputFormat { + /** {@inheritDoc} */ + @Override public void configure(JobConf job) { + super.configure(job); + + flags.put("inputFormatWasConfigured", true); + } + } + + /** + * Custom implementation of OutputFormat in v1 API. + */ + private static class CustomV1OutputFormat extends org.apache.hadoop.mapred.TextOutputFormat implements JobConfigurable { + /** {@inheritDoc} */ + @Override public void configure(JobConf job) { + flags.put("outputFormatWasConfigured", true); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java new file mode 100644 index 0000000..486b856 --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.hadoop.fs.*; +import org.apache.hadoop.io.*; +import org.apache.hadoop.mapred.*; +import org.apache.hadoop.mapreduce.*; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.ignite.*; +import org.apache.ignite.hadoop.fs.*; +import org.apache.ignite.igfs.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.hadoop.counter.*; +import org.apache.ignite.internal.processors.hadoop.examples.*; +import org.apache.ignite.internal.util.lang.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.testframework.*; + +import java.io.*; +import java.util.*; + +import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*; + +/** + * Test of whole cycle of map-reduce processing via Job tracker. + */ +public class HadoopMapReduceTest extends HadoopAbstractWordCountTest { + /** {@inheritDoc} */ + @Override protected int gridCount() { + return 3; + } + + /** + * Tests whole job execution with all phases in all combination of new and old versions of API. + * @throws Exception If fails. + */ + public void testWholeMapReduceExecution() throws Exception { + IgfsPath inDir = new IgfsPath(PATH_INPUT); + + igfs.mkdirs(inDir); + + IgfsPath inFile = new IgfsPath(inDir, GridHadoopWordCount2.class.getSimpleName() + "-input"); + + generateTestFile(inFile.toString(), "red", 100000, "blue", 200000, "green", 150000, "yellow", 70000 ); + + for (int i = 0; i < 8; i++) { + igfs.delete(new IgfsPath(PATH_OUTPUT), true); + + boolean useNewMapper = (i & 1) == 0; + boolean useNewCombiner = (i & 2) == 0; + boolean useNewReducer = (i & 4) == 0; + + JobConf jobConf = new JobConf(); + + jobConf.set(JOB_COUNTER_WRITER_PROPERTY, IgniteHadoopFileSystemCounterWriter.class.getName()); + jobConf.setUser("yyy"); + jobConf.set(IgniteHadoopFileSystemCounterWriter.COUNTER_WRITER_DIR_PROPERTY, "/xxx/${USER}/zzz"); + + //To split into about 40 items for v2 + jobConf.setInt(FileInputFormat.SPLIT_MAXSIZE, 65000); + + //For v1 + jobConf.setInt("fs.local.block.size", 65000); + + // File system coordinates. + setupFileSystems(jobConf); + + GridHadoopWordCount1.setTasksClasses(jobConf, !useNewMapper, !useNewCombiner, !useNewReducer); + + Job job = Job.getInstance(jobConf); + + GridHadoopWordCount2.setTasksClasses(job, useNewMapper, useNewCombiner, useNewReducer); + + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(IntWritable.class); + + FileInputFormat.setInputPaths(job, new Path(igfsScheme() + inFile.toString())); + FileOutputFormat.setOutputPath(job, new Path(igfsScheme() + PATH_OUTPUT)); + + job.setJarByClass(GridHadoopWordCount2.class); + + GridHadoopJobId jobId = new GridHadoopJobId(UUID.randomUUID(), 1); + + IgniteInternalFuture<?> fut = grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration())); + + fut.get(); + + checkJobStatistics(jobId); + + assertEquals("Use new mapper: " + useNewMapper + ", new combiner: " + useNewCombiner + ", new reducer: " + + useNewReducer, + "blue\t200000\n" + + "green\t150000\n" + + "red\t100000\n" + + "yellow\t70000\n", + readAndSortFile(PATH_OUTPUT + "/" + (useNewReducer ? "part-r-" : "part-") + "00000") + ); + } + } + + /** + * Simple test job statistics. + * + * @param jobId Job id. + * @throws IgniteCheckedException + */ + private void checkJobStatistics(GridHadoopJobId jobId) throws IgniteCheckedException, IOException { + GridHadoopCounters cntrs = grid(0).hadoop().counters(jobId); + + HadoopPerformanceCounter perfCntr = HadoopPerformanceCounter.getCounter(cntrs, null); + + Map<String, SortedMap<Integer,Long>> tasks = new TreeMap<>(); + + Map<String, Integer> phaseOrders = new HashMap<>(); + phaseOrders.put("submit", 0); + phaseOrders.put("prepare", 1); + phaseOrders.put("start", 2); + phaseOrders.put("Cstart", 3); + phaseOrders.put("finish", 4); + + String prevTaskId = null; + + long apiEvtCnt = 0; + + for (T2<String, Long> evt : perfCntr.evts()) { + //We expect string pattern: COMBINE 1 run 7fa86a14-5a08-40e3-a7cb-98109b52a706 + String[] parsedEvt = evt.get1().split(" "); + + String taskId; + String taskPhase; + + if ("JOB".equals(parsedEvt[0])) { + taskId = parsedEvt[0]; + taskPhase = parsedEvt[1]; + } + else { + taskId = ("COMBINE".equals(parsedEvt[0]) ? "MAP" : parsedEvt[0].substring(0, 3)) + parsedEvt[1]; + taskPhase = ("COMBINE".equals(parsedEvt[0]) ? "C" : "") + parsedEvt[2]; + } + + if (!taskId.equals(prevTaskId)) + tasks.put(taskId, new TreeMap<Integer,Long>()); + + Integer pos = phaseOrders.get(taskPhase); + + assertNotNull("Invalid phase " + taskPhase, pos); + + tasks.get(taskId).put(pos, evt.get2()); + + prevTaskId = taskId; + + apiEvtCnt++; + } + + for (Map.Entry<String ,SortedMap<Integer,Long>> task : tasks.entrySet()) { + Map<Integer, Long> order = task.getValue(); + + long prev = 0; + + for (Map.Entry<Integer, Long> phase : order.entrySet()) { + assertTrue("Phase order of " + task.getKey() + " is invalid", phase.getValue() >= prev); + + prev = phase.getValue(); + } + } + + final IgfsPath statPath = new IgfsPath("/xxx/yyy/zzz/" + jobId + "/performance"); + + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return igfs.exists(statPath); + } + }, 10000); + + BufferedReader reader = new BufferedReader(new InputStreamReader(igfs.open(statPath))); + + assertEquals(apiEvtCnt, GridHadoopTestUtils.simpleCheckJobStatFile(reader)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSerializationWrapperSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSerializationWrapperSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSerializationWrapperSelfTest.java index 116248f..5d5bb94 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSerializationWrapperSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSerializationWrapperSelfTest.java @@ -34,7 +34,7 @@ public class HadoopSerializationWrapperSelfTest extends GridCommonAbstractTest { * @throws Exception If fails. */ public void testIntWritableSerialization() throws Exception { - GridHadoopSerialization ser = new HadoopSerializationWrapper(new WritableSerialization(), IntWritable.class); + HadoopSerialization ser = new HadoopSerializationWrapper(new WritableSerialization(), IntWritable.class); ByteArrayOutputStream buf = new ByteArrayOutputStream(); @@ -56,7 +56,7 @@ public class HadoopSerializationWrapperSelfTest extends GridCommonAbstractTest { * @throws Exception If fails. */ public void testIntJavaSerialization() throws Exception { - GridHadoopSerialization ser = new HadoopSerializationWrapper(new JavaSerialization(), Integer.class); + HadoopSerialization ser = new HadoopSerializationWrapper(new JavaSerialization(), Integer.class); ByteArrayOutputStream buf = new ByteArrayOutputStream(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSortingExternalTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSortingExternalTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSortingExternalTest.java new file mode 100644 index 0000000..76357c0 --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSortingExternalTest.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +/** + * External test for sorting. + */ +public class HadoopSortingExternalTest extends HadoopSortingTest { + /** {@inheritDoc} */ + @Override public GridHadoopConfiguration hadoopConfiguration(String gridName) { + GridHadoopConfiguration cfg = super.hadoopConfiguration(gridName); + + cfg.setExternalExecution(true); + + return cfg; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSortingTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSortingTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSortingTest.java new file mode 100644 index 0000000..5d28a30 --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSortingTest.java @@ -0,0 +1,281 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.hadoop.fs.*; +import org.apache.hadoop.io.*; +import org.apache.hadoop.io.serializer.*; +import org.apache.hadoop.mapreduce.*; +import org.apache.hadoop.mapreduce.lib.input.*; +import org.apache.hadoop.mapreduce.lib.output.*; +import org.apache.ignite.internal.util.typedef.*; + +import java.io.*; +import java.net.*; +import java.util.*; + +import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*; + +/** + * Tests correct sorting. + */ +public class HadoopSortingTest extends HadoopAbstractSelfTest { + /** */ + private static final String PATH_INPUT = "/test-in"; + + /** */ + private static final String PATH_OUTPUT = "/test-out"; + + /** {@inheritDoc} */ + @Override protected int gridCount() { + return 3; + } + + /** + * @return {@code True} if IGFS is enabled on Hadoop nodes. + */ + @Override protected boolean igfsEnabled() { + return true; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + startGrids(gridCount()); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(true); + } + + /** {@inheritDoc} */ + @Override public GridHadoopConfiguration hadoopConfiguration(String gridName) { + GridHadoopConfiguration cfg = super.hadoopConfiguration(gridName); + + cfg.setExternalExecution(false); + + return cfg; + } + + /** + * @throws Exception If failed. + */ + public void testSortSimple() throws Exception { + // Generate test data. + Job job = Job.getInstance(); + + job.setInputFormatClass(InFormat.class); + + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(NullWritable.class); + + job.setMapperClass(Mapper.class); + job.setNumReduceTasks(0); + + setupFileSystems(job.getConfiguration()); + + FileOutputFormat.setOutputPath(job, new Path(igfsScheme() + PATH_INPUT)); + + X.printerrln("Data generation started."); + + grid(0).hadoop().submit(new GridHadoopJobId(UUID.randomUUID(), 1), + createJobInfo(job.getConfiguration())).get(180000); + + X.printerrln("Data generation complete."); + + // Run main map-reduce job. + job = Job.getInstance(); + + setupFileSystems(job.getConfiguration()); + + job.getConfiguration().set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, JavaSerialization.class.getName() + + "," + WritableSerialization.class.getName()); + + FileInputFormat.setInputPaths(job, new Path(igfsScheme() + PATH_INPUT)); + FileOutputFormat.setOutputPath(job, new Path(igfsScheme() + PATH_OUTPUT)); + + job.setSortComparatorClass(JavaSerializationComparator.class); + + job.setMapperClass(MyMapper.class); + job.setReducerClass(MyReducer.class); + + job.setNumReduceTasks(2); + + job.setMapOutputKeyClass(UUID.class); + job.setMapOutputValueClass(NullWritable.class); + + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(NullWritable.class); + + X.printerrln("Job started."); + + grid(0).hadoop().submit(new GridHadoopJobId(UUID.randomUUID(), 2), + createJobInfo(job.getConfiguration())).get(180000); + + X.printerrln("Job complete."); + + // Check result. + Path outDir = new Path(igfsScheme() + PATH_OUTPUT); + + AbstractFileSystem fs = AbstractFileSystem.get(new URI(igfsScheme()), job.getConfiguration()); + + for (FileStatus file : fs.listStatus(outDir)) { + X.printerrln("__ file: " + file); + + if (file.getLen() == 0) + continue; + + FSDataInputStream in = fs.open(file.getPath()); + + Scanner sc = new Scanner(in); + + UUID prev = null; + + while(sc.hasNextLine()) { + UUID next = UUID.fromString(sc.nextLine()); + +// X.printerrln("___ check: " + next); + + if (prev != null) + assertTrue(prev.compareTo(next) < 0); + + prev = next; + } + } + } + + public static class InFormat extends InputFormat<Text, NullWritable> { + /** {@inheritDoc} */ + @Override public List<InputSplit> getSplits(JobContext ctx) throws IOException, InterruptedException { + List<InputSplit> res = new ArrayList<>(); + + FakeSplit split = new FakeSplit(20); + + for (int i = 0; i < 10; i++) + res.add(split); + + return res; + } + + /** {@inheritDoc} */ + @Override public RecordReader<Text, NullWritable> createRecordReader(final InputSplit split, + TaskAttemptContext ctx) throws IOException, InterruptedException { + return new RecordReader<Text, NullWritable>() { + /** */ + int cnt; + + /** */ + Text txt = new Text(); + + @Override public void initialize(InputSplit split, TaskAttemptContext ctx) { + // No-op. + } + + @Override public boolean nextKeyValue() throws IOException, InterruptedException { + return ++cnt <= split.getLength(); + } + + @Override public Text getCurrentKey() { + txt.set(UUID.randomUUID().toString()); + +// X.printerrln("___ read: " + txt); + + return txt; + } + + @Override public NullWritable getCurrentValue() { + return NullWritable.get(); + } + + @Override public float getProgress() throws IOException, InterruptedException { + return (float)cnt / split.getLength(); + } + + @Override public void close() { + // No-op. + } + }; + } + } + + public static class MyMapper extends Mapper<LongWritable, Text, UUID, NullWritable> { + /** {@inheritDoc} */ + @Override protected void map(LongWritable key, Text val, Context ctx) throws IOException, InterruptedException { +// X.printerrln("___ map: " + val); + + ctx.write(UUID.fromString(val.toString()), NullWritable.get()); + } + } + + public static class MyReducer extends Reducer<UUID, NullWritable, Text, NullWritable> { + /** */ + private Text text = new Text(); + + /** {@inheritDoc} */ + @Override protected void reduce(UUID key, Iterable<NullWritable> vals, Context ctx) + throws IOException, InterruptedException { +// X.printerrln("___ rdc: " + key); + + text.set(key.toString()); + + ctx.write(text, NullWritable.get()); + } + } + + public static class FakeSplit extends InputSplit implements Writable { + /** */ + private static final String[] HOSTS = {"127.0.0.1"}; + + /** */ + private int len; + + /** + * @param len Length. + */ + public FakeSplit(int len) { + this.len = len; + } + + /** + * + */ + public FakeSplit() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public long getLength() throws IOException, InterruptedException { + return len; + } + + /** {@inheritDoc} */ + @Override public String[] getLocations() throws IOException, InterruptedException { + return HOSTS; + } + + /** {@inheritDoc} */ + @Override public void write(DataOutput out) throws IOException { + out.writeInt(len); + } + + /** {@inheritDoc} */ + @Override public void readFields(DataInput in) throws IOException { + len = in.readInt(); + } + } +}