http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopAbstractSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopAbstractSelfTest.java deleted file mode 100644 index 8319255..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopAbstractSelfTest.java +++ /dev/null @@ -1,222 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import org.apache.hadoop.conf.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.igfs.*; -import org.apache.ignite.igfs.hadoop.v2.IgfsHadoopFileSystem; -import org.apache.ignite.internal.processors.hadoop.fs.*; -import org.apache.ignite.spi.communication.tcp.*; -import org.apache.ignite.spi.discovery.tcp.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; -import org.apache.ignite.testframework.junits.common.*; - -import java.io.*; - -import static org.apache.ignite.cache.CacheAtomicityMode.*; -import static org.apache.ignite.cache.CacheMode.*; -import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; - -/** - * Abstract class for Hadoop tests. - */ -public abstract class GridHadoopAbstractSelfTest extends GridCommonAbstractTest { - /** */ - private static TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); - - /** REST port. */ - protected static final int REST_PORT = 11212; - - /** IGFS name. */ - protected static final String igfsName = null; - - /** IGFS name. */ - protected static final String igfsMetaCacheName = "meta"; - - /** IGFS name. */ - protected static final String igfsDataCacheName = "data"; - - /** IGFS block size. */ - protected static final int igfsBlockSize = 1024; - - /** IGFS block group size. */ - protected static final int igfsBlockGroupSize = 8; - - /** Initial REST port. */ - private int restPort = REST_PORT; - - /** Initial classpath. */ - private static String initCp; - - /** {@inheritDoc} */ - @Override protected void beforeTestsStarted() throws Exception { - // Add surefire classpath to regular classpath. - initCp = System.getProperty("java.class.path"); - - String surefireCp = System.getProperty("surefire.test.class.path"); - - if (surefireCp != null) - System.setProperty("java.class.path", initCp + File.pathSeparatorChar + surefireCp); - - super.beforeTestsStarted(); - } - - /** {@inheritDoc} */ - @Override protected void afterTestsStopped() throws Exception { - super.afterTestsStopped(); - - // Restore classpath. - System.setProperty("java.class.path", initCp); - - initCp = null; - } - - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); - - cfg.setHadoopConfiguration(hadoopConfiguration(gridName)); - - TcpCommunicationSpi commSpi = new TcpCommunicationSpi(); - - commSpi.setSharedMemoryPort(-1); - - cfg.setCommunicationSpi(commSpi); - - TcpDiscoverySpi discoSpi = (TcpDiscoverySpi)cfg.getDiscoverySpi(); - - discoSpi.setIpFinder(IP_FINDER); - - if (igfsEnabled()) { - cfg.setCacheConfiguration(metaCacheConfiguration(), dataCacheConfiguration()); - - cfg.setIgfsConfiguration(igfsConfiguration()); - } - - if (restEnabled()) { - ConnectorConfiguration clnCfg = new ConnectorConfiguration(); - - clnCfg.setPort(restPort++); - - cfg.setConnectorConfiguration(clnCfg); - } - - cfg.setLocalHost("127.0.0.1"); - cfg.setPeerClassLoadingEnabled(false); - - return cfg; - } - - /** - * @param gridName Grid name. - * @return Hadoop configuration. - */ - public GridHadoopConfiguration hadoopConfiguration(String gridName) { - GridHadoopConfiguration cfg = new GridHadoopConfiguration(); - - cfg.setMaxParallelTasks(3); - - return cfg; - } - - /** - * @return IGFS configuration. - */ - public IgfsConfiguration igfsConfiguration() { - IgfsConfiguration cfg = new IgfsConfiguration(); - - cfg.setName(igfsName); - cfg.setBlockSize(igfsBlockSize); - cfg.setDataCacheName(igfsDataCacheName); - cfg.setMetaCacheName(igfsMetaCacheName); - cfg.setFragmentizerEnabled(false); - - return cfg; - } - - /** - * @return IGFS meta cache configuration. - */ - public CacheConfiguration metaCacheConfiguration() { - CacheConfiguration cfg = new CacheConfiguration(); - - cfg.setName(igfsMetaCacheName); - cfg.setCacheMode(REPLICATED); - cfg.setAtomicityMode(TRANSACTIONAL); - cfg.setWriteSynchronizationMode(FULL_SYNC); - - return cfg; - } - - /** - * @return IGFS data cache configuration. - */ - private CacheConfiguration dataCacheConfiguration() { - CacheConfiguration cfg = new CacheConfiguration(); - - cfg.setName(igfsDataCacheName); - cfg.setCacheMode(PARTITIONED); - cfg.setAtomicityMode(TRANSACTIONAL); - cfg.setAffinityMapper(new IgfsGroupDataBlocksKeyMapper(igfsBlockGroupSize)); - cfg.setWriteSynchronizationMode(FULL_SYNC); - - return cfg; - } - - /** - * @return {@code True} if IGFS is enabled on Hadoop nodes. - */ - protected boolean igfsEnabled() { - return false; - } - - /** - * @return {@code True} if REST is enabled on Hadoop nodes. - */ - protected boolean restEnabled() { - return false; - } - - /** - * @return Number of nodes to start. - */ - protected int gridCount() { - return 3; - } - - /** - * @param cfg Config. - */ - protected void setupFileSystems(Configuration cfg) { - cfg.set("fs.defaultFS", igfsScheme()); - cfg.set("fs.igfs.impl", org.apache.ignite.igfs.hadoop.v1.IgfsHadoopFileSystem.class.getName()); - cfg.set("fs.AbstractFileSystem.igfs.impl", IgfsHadoopFileSystem. - class.getName()); - - GridHadoopFileSystemsUtils.setupFileSystems(cfg); - } - - /** - * @return IGFS scheme for test. - */ - protected String igfsScheme() { - return "igfs://:" + getTestGridName(0) + "@/"; - } -}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopAbstractWordCountTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopAbstractWordCountTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopAbstractWordCountTest.java deleted file mode 100644 index ebbc0a6..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopAbstractWordCountTest.java +++ /dev/null @@ -1,138 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import com.google.common.base.*; -import org.apache.hadoop.conf.*; -import org.apache.hadoop.fs.FileSystem; -import org.apache.ignite.igfs.*; -import org.apache.ignite.internal.processors.igfs.*; - -import java.io.*; -import java.util.*; - -/** - * Abstract class for tests based on WordCount test job. - */ -public abstract class GridHadoopAbstractWordCountTest extends GridHadoopAbstractSelfTest { - /** Input path. */ - protected static final String PATH_INPUT = "/input"; - - /** Output path. */ - protected static final String PATH_OUTPUT = "/output"; - - /** IGFS instance. */ - protected IgfsEx igfs; - - /** {@inheritDoc} */ - @Override protected void beforeTestsStarted() throws Exception { - super.beforeTestsStarted(); - - Configuration cfg = new Configuration(); - - setupFileSystems(cfg); - - // Init cache by correct LocalFileSystem implementation - FileSystem.getLocal(cfg); - } - - /** {@inheritDoc} */ - @Override protected void beforeTest() throws Exception { - igfs = (IgfsEx)startGrids(gridCount()).fileSystem(igfsName); - } - - /** {@inheritDoc} */ - @Override protected void afterTest() throws Exception { - stopAllGrids(true); - } - - /** {@inheritDoc} */ - @Override protected boolean igfsEnabled() { - return true; - } - - /** {@inheritDoc} */ - @Override protected int gridCount() { - return 1; - } - - /** - * Generates test file. - * - * @param path File name. - * @param wordCounts Words and counts. - * @throws Exception If failed. - */ - protected void generateTestFile(String path, Object... wordCounts) throws Exception { - List<String> wordsArr = new ArrayList<>(); - - //Generating - for (int i = 0; i < wordCounts.length; i += 2) { - String word = (String) wordCounts[i]; - int cnt = (Integer) wordCounts[i + 1]; - - while (cnt-- > 0) - wordsArr.add(word); - } - - //Shuffling - for (int i = 0; i < wordsArr.size(); i++) { - int j = (int)(Math.random() * wordsArr.size()); - - Collections.swap(wordsArr, i, j); - } - - //Input file preparing - PrintWriter testInputFileWriter = new PrintWriter(igfs.create(new IgfsPath(path), true)); - - int j = 0; - - while (j < wordsArr.size()) { - int i = 5 + (int)(Math.random() * 5); - - List<String> subList = wordsArr.subList(j, Math.min(j + i, wordsArr.size())); - j += i; - - testInputFileWriter.println(Joiner.on(' ').join(subList)); - } - - testInputFileWriter.close(); - } - - /** - * Reads whole text file into String. - * - * @param fileName Name of the file to read. - * @return Content of the file as String value. - * @throws Exception If could not read the file. - */ - protected String readAndSortFile(String fileName) throws Exception { - BufferedReader reader = new BufferedReader(new InputStreamReader(igfs.open(new IgfsPath(fileName)))); - - List<String> list = new ArrayList<>(); - - String line; - - while ((line = reader.readLine()) != null) - list.add(line); - - Collections.sort(list); - - return Joiner.on('\n').join(list) + "\n"; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopClassLoaderTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopClassLoaderTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopClassLoaderTest.java deleted file mode 100644 index 767be7c..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopClassLoaderTest.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import junit.framework.*; -import org.apache.hadoop.mapreduce.*; - -/** - * - */ -public class GridHadoopClassLoaderTest extends TestCase { - /** */ - GridHadoopClassLoader ldr = new GridHadoopClassLoader(null); - - /** - * @throws Exception If failed. - */ - public void testClassLoading() throws Exception { - assertNotSame(Test1.class, ldr.loadClass(Test1.class.getName())); - assertNotSame(Test2.class, ldr.loadClass(Test2.class.getName())); - assertSame(Test3.class, ldr.loadClass(Test3.class.getName())); - } - -// public void testDependencySearch() { -// assertTrue(ldr.hasExternalDependencies(Test1.class.getName(), new HashSet<String>())); -// assertTrue(ldr.hasExternalDependencies(Test2.class.getName(), new HashSet<String>())); -// } - - /** - * - */ - private static class Test1 { - /** */ - Test2 t2; - - /** */ - Job[][] jobs = new Job[4][4]; - } - - /** - * - */ - private static abstract class Test2 { - /** */ - abstract Test1 t1(); - } - - /** - * - */ - private static class Test3 { - // No-op. - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopCommandLineTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopCommandLineTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopCommandLineTest.java deleted file mode 100644 index 80cd226..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopCommandLineTest.java +++ /dev/null @@ -1,440 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import com.google.common.base.*; -import org.apache.ignite.*; -import org.apache.ignite.igfs.*; -import org.apache.ignite.internal.processors.igfs.*; -import org.apache.ignite.internal.processors.hadoop.counter.*; -import org.apache.ignite.internal.processors.hadoop.jobtracker.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.testframework.junits.common.*; -import org.jdk8.backport.*; - -import java.io.*; -import java.nio.file.*; -import java.util.*; - -/** - * Test of integration with Hadoop client via command line interface. - */ -public class GridHadoopCommandLineTest extends GridCommonAbstractTest { - /** IGFS instance. */ - private IgfsEx igfs; - - /** */ - private static final String igfsName = "igfs"; - - /** */ - private static File testWorkDir; - - /** */ - private static String hadoopHome; - - /** */ - private static String hiveHome; - - /** */ - private static File examplesJar; - - /** - * - * @param path File name. - * @param wordCounts Words and counts. - * @throws Exception If failed. - */ - private void generateTestFile(File path, Object... wordCounts) throws Exception { - List<String> wordsArr = new ArrayList<>(); - - //Generating - for (int i = 0; i < wordCounts.length; i += 2) { - String word = (String) wordCounts[i]; - int cnt = (Integer) wordCounts[i + 1]; - - while (cnt-- > 0) - wordsArr.add(word); - } - - //Shuffling - for (int i = 0; i < wordsArr.size(); i++) { - int j = (int)(Math.random() * wordsArr.size()); - - Collections.swap(wordsArr, i, j); - } - - //Writing file - try (PrintWriter writer = new PrintWriter(path)) { - int j = 0; - - while (j < wordsArr.size()) { - int i = 5 + (int)(Math.random() * 5); - - List<String> subList = wordsArr.subList(j, Math.min(j + i, wordsArr.size())); - j += i; - - writer.println(Joiner.on(' ').join(subList)); - } - - writer.flush(); - } - } - - /** - * Generates two data files to join its with Hive. - * - * @throws FileNotFoundException If failed. - */ - private void generateHiveTestFiles() throws FileNotFoundException { - try (PrintWriter writerA = new PrintWriter(new File(testWorkDir, "data-a")); - PrintWriter writerB = new PrintWriter(new File(testWorkDir, "data-b"))) { - char sep = '\t'; - - int idB = 0; - int idA = 0; - int v = 1000; - - for (int i = 0; i < 1000; i++) { - writerA.print(idA++); - writerA.print(sep); - writerA.println(idB); - - writerB.print(idB++); - writerB.print(sep); - writerB.println(v += 2); - - writerB.print(idB++); - writerB.print(sep); - writerB.println(v += 2); - } - - writerA.flush(); - writerB.flush(); - } - } - - /** {@inheritDoc} */ - @Override protected void beforeTestsStarted() throws Exception { - super.beforeTestsStarted(); - - hiveHome = IgniteSystemProperties.getString("HIVE_HOME"); - - assertFalse("HIVE_HOME hasn't been set.", F.isEmpty(hiveHome)); - - hadoopHome = IgniteSystemProperties.getString("HADOOP_HOME"); - - assertFalse("HADOOP_HOME hasn't been set.", F.isEmpty(hadoopHome)); - - String mapredHome = hadoopHome + "/share/hadoop/mapreduce"; - - File[] fileList = new File(mapredHome).listFiles(new FileFilter() { - @Override public boolean accept(File pathname) { - return pathname.getName().startsWith("hadoop-mapreduce-examples-") && - pathname.getName().endsWith(".jar"); - } - }); - - assertEquals("Invalid hadoop distribution.", 1, fileList.length); - - examplesJar = fileList[0]; - - testWorkDir = Files.createTempDirectory("hadoop-cli-test").toFile(); - - U.copy(U.resolveIgnitePath("docs/core-site.ignite.xml"), new File(testWorkDir, "core-site.xml"), false); - - File srcFile = U.resolveIgnitePath("docs/mapred-site.ignite.xml"); - File dstFile = new File(testWorkDir, "mapred-site.xml"); - - try (BufferedReader in = new BufferedReader(new FileReader(srcFile)); - PrintWriter out = new PrintWriter(dstFile)) { - String line; - - while ((line = in.readLine()) != null) { - if (line.startsWith("</configuration>")) - out.println( - " <property>\n" + - " <name>" + GridHadoopUtils.JOB_COUNTER_WRITER_PROPERTY + "</name>\n" + - " <value>" + GridHadoopFSCounterWriter.class.getName() + "</value>\n" + - " </property>\n"); - - out.println(line); - } - - out.flush(); - } - - generateTestFile(new File(testWorkDir, "test-data"), "red", 100, "green", 200, "blue", 150, "yellow", 50); - - generateHiveTestFiles(); - } - - /** {@inheritDoc} */ - @Override protected void afterTestsStopped() throws Exception { - super.afterTestsStopped(); - - U.delete(testWorkDir); - } - - /** {@inheritDoc} */ - @Override protected void beforeTest() throws Exception { - igfs = (IgfsEx) Ignition.start("config/hadoop/default-config.xml").fileSystem(igfsName); - } - - /** {@inheritDoc} */ - @Override protected void afterTest() throws Exception { - stopAllGrids(true); - } - - /** - * Creates the process build with appropriate environment to run Hadoop CLI. - * - * @return Process builder. - */ - private ProcessBuilder createProcessBuilder() { - String sep = ":"; - - String ggClsPath = GridHadoopJob.class.getProtectionDomain().getCodeSource().getLocation().getPath() + sep + - GridHadoopJobTracker.class.getProtectionDomain().getCodeSource().getLocation().getPath() + sep + - ConcurrentHashMap8.class.getProtectionDomain().getCodeSource().getLocation().getPath(); - - ProcessBuilder res = new ProcessBuilder(); - - res.environment().put("HADOOP_HOME", hadoopHome); - res.environment().put("HADOOP_CLASSPATH", ggClsPath); - res.environment().put("HADOOP_CONF_DIR", testWorkDir.getAbsolutePath()); - - res.redirectErrorStream(true); - - return res; - } - - /** - * Waits for process exit and prints the its output. - * - * @param proc Process. - * @return Exit code. - * @throws Exception If failed. - */ - private int watchProcess(Process proc) throws Exception { - BufferedReader reader = new BufferedReader(new InputStreamReader(proc.getInputStream())); - - String line; - - while ((line = reader.readLine()) != null) - log().info(line); - - return proc.waitFor(); - } - - /** - * Executes Hadoop command line tool. - * - * @param args Arguments for Hadoop command line tool. - * @return Process exit code. - * @throws Exception If failed. - */ - private int executeHadoopCmd(String... args) throws Exception { - ProcessBuilder procBuilder = createProcessBuilder(); - - List<String> cmd = new ArrayList<>(); - - cmd.add(hadoopHome + "/bin/hadoop"); - cmd.addAll(Arrays.asList(args)); - - procBuilder.command(cmd); - - log().info("Execute: " + procBuilder.command()); - - return watchProcess(procBuilder.start()); - } - - /** - * Executes Hive query. - * - * @param qry Query. - * @return Process exit code. - * @throws Exception If failed. - */ - private int executeHiveQuery(String qry) throws Exception { - ProcessBuilder procBuilder = createProcessBuilder(); - - List<String> cmd = new ArrayList<>(); - - procBuilder.command(cmd); - - cmd.add(hiveHome + "/bin/hive"); - - cmd.add("--hiveconf"); - cmd.add("hive.rpc.query.plan=true"); - - cmd.add("--hiveconf"); - cmd.add("javax.jdo.option.ConnectionURL=jdbc:derby:" + testWorkDir.getAbsolutePath() + "/metastore_db;" + - "databaseName=metastore_db;create=true"); - - cmd.add("-e"); - cmd.add(qry); - - procBuilder.command(cmd); - - log().info("Execute: " + procBuilder.command()); - - return watchProcess(procBuilder.start()); - } - - /** - * Tests Hadoop command line integration. - */ - public void testHadoopCommandLine() throws Exception { - assertEquals(0, executeHadoopCmd("fs", "-ls", "/")); - - assertEquals(0, executeHadoopCmd("fs", "-mkdir", "/input")); - - assertEquals(0, executeHadoopCmd("fs", "-put", new File(testWorkDir, "test-data").getAbsolutePath(), "/input")); - - assertTrue(igfs.exists(new IgfsPath("/input/test-data"))); - - assertEquals(0, executeHadoopCmd("jar", examplesJar.getAbsolutePath(), "wordcount", "/input", "/output")); - - IgfsPath path = new IgfsPath("/user/" + System.getProperty("user.name") + "/"); - - assertTrue(igfs.exists(path)); - - IgfsPath jobStatPath = null; - - for (IgfsPath jobPath : igfs.listPaths(path)) { - assertNull(jobStatPath); - - jobStatPath = jobPath; - } - - File locStatFile = new File(testWorkDir, "performance"); - - assertEquals(0, executeHadoopCmd("fs", "-get", jobStatPath.toString() + "/performance", locStatFile.toString())); - - long evtCnt = GridHadoopTestUtils.simpleCheckJobStatFile(new BufferedReader(new FileReader(locStatFile))); - - assertTrue(evtCnt >= 22); //It's the minimum amount of events for job with combiner. - - assertTrue(igfs.exists(new IgfsPath("/output"))); - - BufferedReader in = new BufferedReader(new InputStreamReader(igfs.open(new IgfsPath("/output/part-r-00000")))); - - List<String> res = new ArrayList<>(); - - String line; - - while ((line = in.readLine()) != null) - res.add(line); - - Collections.sort(res); - - assertEquals("[blue\t150, green\t200, red\t100, yellow\t50]", res.toString()); - } - - /** - * Runs query check result. - * - * @param expRes Expected result. - * @param qry Query. - * @throws Exception If failed. - */ - private void checkQuery(String expRes, String qry) throws Exception { - assertEquals(0, executeHiveQuery("drop table if exists result")); - - assertEquals(0, executeHiveQuery( - "create table result " + - "row format delimited fields terminated by ' ' " + - "stored as textfile " + - "location '/result' as " + qry - )); - - IgfsInputStreamAdapter in = igfs.open(new IgfsPath("/result/000000_0")); - - byte[] buf = new byte[(int) in.length()]; - - in.read(buf); - - assertEquals(expRes, new String(buf)); - } - - /** - * Tests Hive integration. - */ - public void testHiveCommandLine() throws Exception { - assertEquals(0, executeHiveQuery( - "create table table_a (" + - "id_a int," + - "id_b int" + - ") " + - "row format delimited fields terminated by '\\t'" + - "stored as textfile " + - "location '/table-a'" - )); - - assertEquals(0, executeHadoopCmd("fs", "-put", new File(testWorkDir, "data-a").getAbsolutePath(), "/table-a")); - - assertEquals(0, executeHiveQuery( - "create table table_b (" + - "id_b int," + - "rndv int" + - ") " + - "row format delimited fields terminated by '\\t'" + - "stored as textfile " + - "location '/table-b'" - )); - - assertEquals(0, executeHadoopCmd("fs", "-put", new File(testWorkDir, "data-b").getAbsolutePath(), "/table-b")); - - checkQuery( - "0 0\n" + - "1 2\n" + - "2 4\n" + - "3 6\n" + - "4 8\n" + - "5 10\n" + - "6 12\n" + - "7 14\n" + - "8 16\n" + - "9 18\n", - "select * from table_a order by id_a limit 10" - ); - - checkQuery("2000\n", "select count(id_b) from table_b"); - - checkQuery( - "250 500 2002\n" + - "251 502 2006\n" + - "252 504 2010\n" + - "253 506 2014\n" + - "254 508 2018\n" + - "255 510 2022\n" + - "256 512 2026\n" + - "257 514 2030\n" + - "258 516 2034\n" + - "259 518 2038\n", - "select a.id_a, a.id_b, b.rndv" + - " from table_a a" + - " inner join table_b b on a.id_b = b.id_b" + - " where b.rndv > 2000" + - " order by a.id_a limit 10" - ); - - checkQuery("1000\n", "select count(b.id_b) from table_a a inner join table_b b on a.id_b = b.id_b"); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopDefaultMapReducePlannerSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopDefaultMapReducePlannerSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopDefaultMapReducePlannerSelfTest.java deleted file mode 100644 index b1b0275..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopDefaultMapReducePlannerSelfTest.java +++ /dev/null @@ -1,1005 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import org.apache.ignite.*; -import org.apache.ignite.cache.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.igfs.*; -import org.apache.ignite.igfs.mapreduce.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.cluster.*; -import org.apache.ignite.internal.processors.cache.*; -import org.apache.ignite.internal.processors.igfs.*; -import org.apache.ignite.internal.processors.hadoop.planner.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.testframework.*; -import org.jetbrains.annotations.*; - -import java.net.*; -import java.util.*; - -/** - * - */ -public class GridHadoopDefaultMapReducePlannerSelfTest extends GridHadoopAbstractSelfTest { - /** */ - private static final UUID ID_1 = new UUID(0, 1); - - /** */ - private static final UUID ID_2 = new UUID(0, 2); - - /** */ - private static final UUID ID_3 = new UUID(0, 3); - - /** */ - private static final String HOST_1 = "host1"; - - /** */ - private static final String HOST_2 = "host2"; - - /** */ - private static final String HOST_3 = "host3"; - - /** */ - private static final String INVALID_HOST_1 = "invalid_host1"; - - /** */ - private static final String INVALID_HOST_2 = "invalid_host2"; - - /** */ - private static final String INVALID_HOST_3 = "invalid_host3"; - - /** Mocked Grid. */ - private static final MockIgnite GRID = new MockIgnite(); - - /** Mocked IGFS. */ - private static final IgniteFs IGFS = new MockIgfs(); - - /** Planner. */ - private static final GridHadoopMapReducePlanner PLANNER = new GridHadoopDefaultMapReducePlanner(); - - /** Block locations. */ - private static final Map<Block, Collection<IgfsBlockLocation>> BLOCK_MAP = new HashMap<>(); - - /** Proxy map. */ - private static final Map<URI, Boolean> PROXY_MAP = new HashMap<>(); - - /** Last created plan. */ - private static final ThreadLocal<GridHadoopMapReducePlan> PLAN = new ThreadLocal<>(); - - /** - * - */ - static { - GridTestUtils.setFieldValue(PLANNER, "ignite", GRID); - } - - /** {@inheritDoc} */ - @Override protected void beforeTest() throws Exception { - GridTestUtils.setFieldValue(PLANNER, "log", log()); - - BLOCK_MAP.clear(); - PROXY_MAP.clear(); - } - - /** - * @throws IgniteCheckedException If failed. - */ - public void testIgfsOneBlockPerNode() throws IgniteCheckedException { - GridHadoopFileBlock split1 = split(true, "/file1", 0, 100, HOST_1); - GridHadoopFileBlock split2 = split(true, "/file2", 0, 100, HOST_2); - GridHadoopFileBlock split3 = split(true, "/file3", 0, 100, HOST_3); - - mapIgfsBlock(split1.file(), 0, 100, location(0, 100, ID_1)); - mapIgfsBlock(split2.file(), 0, 100, location(0, 100, ID_2)); - mapIgfsBlock(split3.file(), 0, 100, location(0, 100, ID_3)); - - plan(1, split1); - assert ensureMappers(ID_1, split1); - assert ensureReducers(ID_1, 1); - assert ensureEmpty(ID_2); - assert ensureEmpty(ID_3); - - plan(2, split1); - assert ensureMappers(ID_1, split1); - assert ensureReducers(ID_1, 2); - assert ensureEmpty(ID_2); - assert ensureEmpty(ID_3); - - plan(1, split1, split2); - assert ensureMappers(ID_1, split1); - assert ensureMappers(ID_2, split2); - assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 0) || ensureReducers(ID_1, 0) && ensureReducers(ID_2, 1); - assert ensureEmpty(ID_3); - - plan(2, split1, split2); - assert ensureMappers(ID_1, split1); - assert ensureMappers(ID_2, split2); - assert ensureReducers(ID_1, 1); - assert ensureReducers(ID_2, 1); - assert ensureEmpty(ID_3); - - plan(3, split1, split2); - assert ensureMappers(ID_1, split1); - assert ensureMappers(ID_2, split2); - assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 2) || ensureReducers(ID_1, 2) && ensureReducers(ID_2, 1); - assert ensureEmpty(ID_3); - - plan(3, split1, split2, split3); - assert ensureMappers(ID_1, split1); - assert ensureMappers(ID_2, split2); - assert ensureMappers(ID_3, split3); - assert ensureReducers(ID_1, 1); - assert ensureReducers(ID_2, 1); - assert ensureReducers(ID_3, 1); - - plan(5, split1, split2, split3); - assert ensureMappers(ID_1, split1); - assert ensureMappers(ID_2, split2); - assert ensureMappers(ID_3, split3); - assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 2) && ensureReducers(ID_3, 2) || - ensureReducers(ID_1, 2) && ensureReducers(ID_2, 1) && ensureReducers(ID_3, 2) || - ensureReducers(ID_1, 2) && ensureReducers(ID_2, 2) && ensureReducers(ID_3, 1); - } - - /** - * @throws IgniteCheckedException If failed. - */ - public void testNonIgfsOneBlockPerNode() throws IgniteCheckedException { - GridHadoopFileBlock split1 = split(false, "/file1", 0, 100, HOST_1); - GridHadoopFileBlock split2 = split(false, "/file2", 0, 100, HOST_2); - GridHadoopFileBlock split3 = split(false, "/file3", 0, 100, HOST_3); - - plan(1, split1); - assert ensureMappers(ID_1, split1); - assert ensureReducers(ID_1, 1); - assert ensureEmpty(ID_2); - assert ensureEmpty(ID_3); - - plan(2, split1); - assert ensureMappers(ID_1, split1); - assert ensureReducers(ID_1, 2); - assert ensureEmpty(ID_2); - assert ensureEmpty(ID_3); - - plan(1, split1, split2); - assert ensureMappers(ID_1, split1); - assert ensureMappers(ID_2, split2); - assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 0) || ensureReducers(ID_1, 0) && ensureReducers(ID_2, 1); - assert ensureEmpty(ID_3); - - plan(2, split1, split2); - assert ensureMappers(ID_1, split1); - assert ensureMappers(ID_2, split2); - assert ensureReducers(ID_1, 1); - assert ensureReducers(ID_2, 1); - assert ensureEmpty(ID_3); - - plan(3, split1, split2); - assert ensureMappers(ID_1, split1); - assert ensureMappers(ID_2, split2); - assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 2) || ensureReducers(ID_1, 2) && ensureReducers(ID_2, 1); - assert ensureEmpty(ID_3); - - plan(3, split1, split2, split3); - assert ensureMappers(ID_1, split1); - assert ensureMappers(ID_2, split2); - assert ensureMappers(ID_3, split3); - assert ensureReducers(ID_1, 1); - assert ensureReducers(ID_2, 1); - assert ensureReducers(ID_3, 1); - - plan(5, split1, split2, split3); - assert ensureMappers(ID_1, split1); - assert ensureMappers(ID_2, split2); - assert ensureMappers(ID_3, split3); - assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 2) && ensureReducers(ID_3, 2) || - ensureReducers(ID_1, 2) && ensureReducers(ID_2, 1) && ensureReducers(ID_3, 2) || - ensureReducers(ID_1, 2) && ensureReducers(ID_2, 2) && ensureReducers(ID_3, 1); - } - - /** - * @throws IgniteCheckedException If failed. - */ - public void testIgfsSeveralBlocksPerNode() throws IgniteCheckedException { - GridHadoopFileBlock split1 = split(true, "/file1", 0, 100, HOST_1, HOST_2); - GridHadoopFileBlock split2 = split(true, "/file2", 0, 100, HOST_1, HOST_2); - GridHadoopFileBlock split3 = split(true, "/file3", 0, 100, HOST_1, HOST_3); - - mapIgfsBlock(split1.file(), 0, 100, location(0, 100, ID_1, ID_2)); - mapIgfsBlock(split2.file(), 0, 100, location(0, 100, ID_1, ID_2)); - mapIgfsBlock(split3.file(), 0, 100, location(0, 100, ID_1, ID_3)); - - plan(1, split1); - assert ensureMappers(ID_1, split1) && ensureReducers(ID_1, 1) && ensureEmpty(ID_2) || - ensureEmpty(ID_1) && ensureMappers(ID_2, split1) && ensureReducers(ID_2, 1); - assert ensureEmpty(ID_3); - - plan(2, split1); - assert ensureMappers(ID_1, split1) && ensureReducers(ID_1, 2) && ensureEmpty(ID_2) || - ensureEmpty(ID_1) && ensureMappers(ID_2, split1) && ensureReducers(ID_2, 2); - assert ensureEmpty(ID_3); - - plan(1, split1, split2); - assert ensureMappers(ID_1, split1) && ensureMappers(ID_2, split2) || - ensureMappers(ID_1, split2) && ensureMappers(ID_2, split1); - assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 0) || ensureReducers(ID_1, 0) && ensureReducers(ID_2, 1); - assert ensureEmpty(ID_3); - - plan(2, split1, split2); - assert ensureMappers(ID_1, split1) && ensureMappers(ID_2, split2) || - ensureMappers(ID_1, split2) && ensureMappers(ID_2, split1); - assert ensureReducers(ID_1, 1); - assert ensureReducers(ID_2, 1); - assert ensureEmpty(ID_3); - - plan(3, split1, split2, split3); - assert ensureReducers(ID_1, 1); - assert ensureReducers(ID_2, 1); - assert ensureReducers(ID_3, 1); - - plan(5, split1, split2, split3); - assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 2) && ensureReducers(ID_3, 2) || - ensureReducers(ID_1, 2) && ensureReducers(ID_2, 1) && ensureReducers(ID_3, 2) || - ensureReducers(ID_1, 2) && ensureReducers(ID_2, 2) && ensureReducers(ID_3, 1); - } - - /** - * @throws IgniteCheckedException If failed. - */ - public void testNonIgfsSeveralBlocksPerNode() throws IgniteCheckedException { - GridHadoopFileBlock split1 = split(false, "/file1", 0, 100, HOST_1, HOST_2); - GridHadoopFileBlock split2 = split(false, "/file2", 0, 100, HOST_1, HOST_2); - GridHadoopFileBlock split3 = split(false, "/file3", 0, 100, HOST_1, HOST_3); - - plan(1, split1); - assert ensureMappers(ID_1, split1) && ensureReducers(ID_1, 1) && ensureEmpty(ID_2) || - ensureEmpty(ID_1) && ensureMappers(ID_2, split1) && ensureReducers(ID_2, 1); - assert ensureEmpty(ID_3); - - plan(2, split1); - assert ensureMappers(ID_1, split1) && ensureReducers(ID_1, 2) && ensureEmpty(ID_2) || - ensureEmpty(ID_1) && ensureMappers(ID_2, split1) && ensureReducers(ID_2, 2); - assert ensureEmpty(ID_3); - - plan(1, split1, split2); - assert ensureMappers(ID_1, split1) && ensureMappers(ID_2, split2) || - ensureMappers(ID_1, split2) && ensureMappers(ID_2, split1); - assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 0) || ensureReducers(ID_1, 0) && ensureReducers(ID_2, 1); - assert ensureEmpty(ID_3); - - plan(2, split1, split2); - assert ensureMappers(ID_1, split1) && ensureMappers(ID_2, split2) || - ensureMappers(ID_1, split2) && ensureMappers(ID_2, split1); - assert ensureReducers(ID_1, 1); - assert ensureReducers(ID_2, 1); - assert ensureEmpty(ID_3); - - plan(3, split1, split2, split3); - assert ensureReducers(ID_1, 1); - assert ensureReducers(ID_2, 1); - assert ensureReducers(ID_3, 1); - - plan(5, split1, split2, split3); - assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 2) && ensureReducers(ID_3, 2) || - ensureReducers(ID_1, 2) && ensureReducers(ID_2, 1) && ensureReducers(ID_3, 2) || - ensureReducers(ID_1, 2) && ensureReducers(ID_2, 2) && ensureReducers(ID_3, 1); - } - - /** - * @throws IgniteCheckedException If failed. - */ - public void testIgfsSeveralComplexBlocksPerNode() throws IgniteCheckedException { - GridHadoopFileBlock split1 = split(true, "/file1", 0, 100, HOST_1, HOST_2, HOST_3); - GridHadoopFileBlock split2 = split(true, "/file2", 0, 100, HOST_1, HOST_2, HOST_3); - - mapIgfsBlock(split1.file(), 0, 100, location(0, 50, ID_1, ID_2), location(51, 100, ID_1, ID_3)); - mapIgfsBlock(split2.file(), 0, 100, location(0, 50, ID_1, ID_2), location(51, 100, ID_2, ID_3)); - - plan(1, split1); - assert ensureMappers(ID_1, split1); - assert ensureReducers(ID_1, 1); - assert ensureEmpty(ID_2); - assert ensureEmpty(ID_3); - - plan(1, split2); - assert ensureMappers(ID_2, split2); - assert ensureReducers(ID_2, 1); - assert ensureEmpty(ID_1); - assert ensureEmpty(ID_3); - - plan(1, split1, split2); - assert ensureMappers(ID_1, split1); - assert ensureMappers(ID_2, split2); - assert ensureReducers(ID_1, 0) && ensureReducers(ID_2, 1) || ensureReducers(ID_1, 1) && ensureReducers(ID_2, 0); - assert ensureEmpty(ID_3); - - plan(2, split1, split2); - assert ensureMappers(ID_1, split1); - assert ensureMappers(ID_2, split2); - assert ensureReducers(ID_1, 1); - assert ensureReducers(ID_2, 1); - assert ensureEmpty(ID_3); - } - - /** - * @throws IgniteCheckedException If failed. - */ - public void testNonIgfsOrphans() throws IgniteCheckedException { - GridHadoopFileBlock split1 = split(false, "/file1", 0, 100, INVALID_HOST_1, INVALID_HOST_2); - GridHadoopFileBlock split2 = split(false, "/file2", 0, 100, INVALID_HOST_1, INVALID_HOST_3); - GridHadoopFileBlock split3 = split(false, "/file3", 0, 100, INVALID_HOST_2, INVALID_HOST_3); - - plan(1, split1); - assert ensureMappers(ID_1, split1) && ensureReducers(ID_1, 1) && ensureEmpty(ID_2) && ensureEmpty(ID_3) || - ensureEmpty(ID_1) && ensureMappers(ID_2, split1) && ensureReducers(ID_2, 1) && ensureEmpty(ID_3) || - ensureEmpty(ID_1) && ensureEmpty(ID_2) && ensureMappers(ID_3, split1) && ensureReducers(ID_3, 1); - - plan(2, split1); - assert ensureMappers(ID_1, split1) && ensureReducers(ID_1, 2) && ensureEmpty(ID_2) && ensureEmpty(ID_3) || - ensureEmpty(ID_1) && ensureMappers(ID_2, split1) && ensureReducers(ID_2, 2) && ensureEmpty(ID_3) || - ensureEmpty(ID_1) && ensureEmpty(ID_2) && ensureMappers(ID_3, split1) && ensureReducers(ID_3, 2); - - plan(1, split1, split2, split3); - assert ensureMappers(ID_1, split1) && ensureMappers(ID_2, split2) && ensureMappers(ID_3, split3) || - ensureMappers(ID_1, split1) && ensureMappers(ID_2, split3) && ensureMappers(ID_3, split2) || - ensureMappers(ID_1, split2) && ensureMappers(ID_2, split1) && ensureMappers(ID_3, split3) || - ensureMappers(ID_1, split2) && ensureMappers(ID_2, split3) && ensureMappers(ID_3, split1) || - ensureMappers(ID_1, split3) && ensureMappers(ID_2, split1) && ensureMappers(ID_3, split2) || - ensureMappers(ID_1, split3) && ensureMappers(ID_2, split2) && ensureMappers(ID_3, split1); - assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 0) && ensureReducers(ID_3, 0) || - ensureReducers(ID_1, 0) && ensureReducers(ID_2, 1) && ensureReducers(ID_3, 0) || - ensureReducers(ID_1, 0) && ensureReducers(ID_2, 0) && ensureReducers(ID_3, 1); - - plan(3, split1, split2, split3); - assert ensureMappers(ID_1, split1) && ensureMappers(ID_2, split2) && ensureMappers(ID_3, split3) || - ensureMappers(ID_1, split1) && ensureMappers(ID_2, split3) && ensureMappers(ID_3, split2) || - ensureMappers(ID_1, split2) && ensureMappers(ID_2, split1) && ensureMappers(ID_3, split3) || - ensureMappers(ID_1, split2) && ensureMappers(ID_2, split3) && ensureMappers(ID_3, split1) || - ensureMappers(ID_1, split3) && ensureMappers(ID_2, split1) && ensureMappers(ID_3, split2) || - ensureMappers(ID_1, split3) && ensureMappers(ID_2, split2) && ensureMappers(ID_3, split1); - assert ensureReducers(ID_1, 1); - assert ensureReducers(ID_2, 1); - assert ensureReducers(ID_3, 1); - - plan(5, split1, split2, split3); - assert ensureMappers(ID_1, split1) && ensureMappers(ID_2, split2) && ensureMappers(ID_3, split3) || - ensureMappers(ID_1, split1) && ensureMappers(ID_2, split3) && ensureMappers(ID_3, split2) || - ensureMappers(ID_1, split2) && ensureMappers(ID_2, split1) && ensureMappers(ID_3, split3) || - ensureMappers(ID_1, split2) && ensureMappers(ID_2, split3) && ensureMappers(ID_3, split1) || - ensureMappers(ID_1, split3) && ensureMappers(ID_2, split1) && ensureMappers(ID_3, split2) || - ensureMappers(ID_1, split3) && ensureMappers(ID_2, split2) && ensureMappers(ID_3, split1); - assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 2) && ensureReducers(ID_3, 2) || - ensureReducers(ID_1, 2) && ensureReducers(ID_2, 1) && ensureReducers(ID_3, 2) || - ensureReducers(ID_1, 2) && ensureReducers(ID_2, 2) && ensureReducers(ID_3, 1); - } - - /** - * Create plan. - * - * @param reducers Reducers count. - * @param splits Splits. - * @return Plan. - * @throws IgniteCheckedException If failed. - */ - private static GridHadoopMapReducePlan plan(int reducers, GridHadoopInputSplit... splits) throws IgniteCheckedException { - assert reducers > 0; - assert splits != null && splits.length > 0; - - Collection<GridHadoopInputSplit> splitList = new ArrayList<>(splits.length); - - Collections.addAll(splitList, splits); - - Collection<ClusterNode> top = new ArrayList<>(); - - GridTestNode node1 = new GridTestNode(ID_1); - GridTestNode node2 = new GridTestNode(ID_2); - GridTestNode node3 = new GridTestNode(ID_3); - - node1.setHostName(HOST_1); - node2.setHostName(HOST_2); - node3.setHostName(HOST_3); - - top.add(node1); - top.add(node2); - top.add(node3); - - GridHadoopMapReducePlan plan = PLANNER.preparePlan(new MockJob(reducers, splitList), top, null); - - PLAN.set(plan); - - return plan; - } - - /** - * Ensure that node contains the given mappers. - * - * @param nodeId Node ID. - * @param expSplits Expected splits. - * @return {@code True} if this assumption is valid. - */ - private static boolean ensureMappers(UUID nodeId, GridHadoopInputSplit... expSplits) { - Collection<GridHadoopInputSplit> expSplitsCol = new ArrayList<>(); - - Collections.addAll(expSplitsCol, expSplits); - - Collection<GridHadoopInputSplit> splits = PLAN.get().mappers(nodeId); - - return F.eq(expSplitsCol, splits); - } - - /** - * Ensure that node contains the given amount of reducers. - * - * @param nodeId Node ID. - * @param reducers Reducers. - * @return {@code True} if this assumption is valid. - */ - private static boolean ensureReducers(UUID nodeId, int reducers) { - int[] reducersArr = PLAN.get().reducers(nodeId); - - return reducers == 0 ? F.isEmpty(reducersArr) : (reducersArr != null && reducersArr.length == reducers); - } - - /** - * Ensure that no mappers and reducers is located on this node. - * - * @param nodeId Node ID. - * @return {@code True} if this assumption is valid. - */ - private static boolean ensureEmpty(UUID nodeId) { - return F.isEmpty(PLAN.get().mappers(nodeId)) && F.isEmpty(PLAN.get().reducers(nodeId)); - } - - /** - * Create split. - * - * @param igfs IGFS flag. - * @param file File. - * @param start Start. - * @param len Length. - * @param hosts Hosts. - * @return Split. - */ - private static GridHadoopFileBlock split(boolean igfs, String file, long start, long len, String... hosts) { - URI uri = URI.create((igfs ? "igfs://igfs@" : "hdfs://") + file); - - return new GridHadoopFileBlock(hosts, uri, start, len); - } - - /** - * Create block location. - * - * @param start Start. - * @param len Length. - * @param nodeIds Node IDs. - * @return Block location. - */ - private static IgfsBlockLocation location(long start, long len, UUID... nodeIds) { - assert nodeIds != null && nodeIds.length > 0; - - Collection<ClusterNode> nodes = new ArrayList<>(nodeIds.length); - - for (UUID id : nodeIds) - nodes.add(new GridTestNode(id)); - - return new IgfsBlockLocationImpl(start, len, nodes); - } - - /** - * Map IGFS block to nodes. - * - * @param file File. - * @param start Start. - * @param len Length. - * @param locations Locations. - */ - private static void mapIgfsBlock(URI file, long start, long len, IgfsBlockLocation... locations) { - assert locations != null && locations.length > 0; - - IgfsPath path = new IgfsPath(file); - - Block block = new Block(path, start, len); - - Collection<IgfsBlockLocation> locationsList = new ArrayList<>(); - - Collections.addAll(locationsList, locations); - - BLOCK_MAP.put(block, locationsList); - } - - /** - * Block. - */ - private static class Block { - /** */ - private final IgfsPath path; - - /** */ - private final long start; - - /** */ - private final long len; - - /** - * Constructor. - * - * @param path Path. - * @param start Start. - * @param len Length. - */ - private Block(IgfsPath path, long start, long len) { - this.path = path; - this.start = start; - this.len = len; - } - - /** {@inheritDoc} */ - @SuppressWarnings("RedundantIfStatement") - @Override public boolean equals(Object o) { - if (this == o) return true; - if (!(o instanceof Block)) return false; - - Block block = (Block) o; - - if (len != block.len) - return false; - - if (start != block.start) - return false; - - if (!path.equals(block.path)) - return false; - - return true; - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - int res = path.hashCode(); - - res = 31 * res + (int) (start ^ (start >>> 32)); - res = 31 * res + (int) (len ^ (len >>> 32)); - - return res; - } - } - - /** - * Mocked job. - */ - private static class MockJob implements GridHadoopJob { - /** Reducers count. */ - private final int reducers; - - /** */ - private Collection<GridHadoopInputSplit> splitList; - - /** - * Constructor. - * - * @param reducers Reducers count. - * @param splitList Splits. - */ - private MockJob(int reducers, Collection<GridHadoopInputSplit> splitList) { - this.reducers = reducers; - this.splitList = splitList; - } - - /** {@inheritDoc} */ - @Override public GridHadoopJobId id() { - return null; - } - - /** {@inheritDoc} */ - @Override public GridHadoopJobInfo info() { - return new GridHadoopDefaultJobInfo() { - @Override public int reducers() { - return reducers; - } - }; - } - - /** {@inheritDoc} */ - @Override public Collection<GridHadoopInputSplit> input() throws IgniteCheckedException { - return splitList; - } - - /** {@inheritDoc} */ - @Override public GridHadoopTaskContext getTaskContext(GridHadoopTaskInfo info) throws IgniteCheckedException { - return null; - } - - /** {@inheritDoc} */ - @Override public void initialize(boolean external, UUID nodeId) throws IgniteCheckedException { - // No-op. - } - - /** {@inheritDoc} */ - @Override public void dispose(boolean external) throws IgniteCheckedException { - // No-op. - } - - /** {@inheritDoc} */ - @Override public void prepareTaskEnvironment(GridHadoopTaskInfo info) throws IgniteCheckedException { - // No-op. - } - - /** {@inheritDoc} */ - @Override public void cleanupTaskEnvironment(GridHadoopTaskInfo info) throws IgniteCheckedException { - // No-op. - } - - /** {@inheritDoc} */ - @Override public void cleanupStagingDirectory() { - // No-op. - } - } - - /** - * Mocked IGFS. - */ - private static class MockIgfs implements IgfsEx { - /** {@inheritDoc} */ - @Override public boolean isProxy(URI path) { - return PROXY_MAP.containsKey(path) && PROXY_MAP.get(path); - } - - /** {@inheritDoc} */ - @Override public Collection<IgfsBlockLocation> affinity(IgfsPath path, long start, long len) { - return BLOCK_MAP.get(new Block(path, start, len)); - } - - /** {@inheritDoc} */ - @Override public Collection<IgfsBlockLocation> affinity(IgfsPath path, long start, long len, - long maxLen) { - return null; - } - - /** {@inheritDoc} */ - @Override public void stop() { - // No-op. - } - - /** {@inheritDoc} */ - @Override public IgfsContext context() { - return null; - } - - /** {@inheritDoc} */ - @Override public IgfsPaths proxyPaths() { - return null; - } - - /** {@inheritDoc} */ - @Override public IgfsInputStreamAdapter open(IgfsPath path, int bufSize, int seqReadsBeforePrefetch) { - return null; - } - - /** {@inheritDoc} */ - @Override public IgfsInputStreamAdapter open(IgfsPath path) { - return null; - } - - /** {@inheritDoc} */ - @Override public IgfsInputStreamAdapter open(IgfsPath path, int bufSize) { - return null; - } - - /** {@inheritDoc} */ - @Override public IgfsStatus globalSpace() throws IgniteCheckedException { - return null; - } - - /** {@inheritDoc} */ - @Override public void globalSampling(@Nullable Boolean val) throws IgniteCheckedException { - // No-op. - } - - /** {@inheritDoc} */ - @Nullable @Override public Boolean globalSampling() { - return null; - } - - /** {@inheritDoc} */ - @Override public IgfsLocalMetrics localMetrics() { - return null; - } - - /** {@inheritDoc} */ - @Override public long groupBlockSize() { - return 0; - } - - /** {@inheritDoc} */ - @Override public IgniteInternalFuture<?> awaitDeletesAsync() throws IgniteCheckedException { - return null; - } - - /** {@inheritDoc} */ - @Nullable @Override public String clientLogDirectory() { - return null; - } - - /** {@inheritDoc} */ - @Override public void clientLogDirectory(String logDir) { - // No-op. - } - - /** {@inheritDoc} */ - @Override public boolean evictExclude(IgfsPath path, boolean primary) { - return false; - } - - /** {@inheritDoc} */ - @Nullable @Override public String name() { - return null; - } - - /** {@inheritDoc} */ - @Override public IgfsConfiguration configuration() { - return null; - } - - /** {@inheritDoc} */ - @Override public boolean exists(IgfsPath path) { - return false; - } - - /** {@inheritDoc} */ - @Nullable @Override public IgfsFile info(IgfsPath path) { - return null; - } - - /** {@inheritDoc} */ - @Override public IgfsPathSummary summary(IgfsPath path) { - return null; - } - - /** {@inheritDoc} */ - @Nullable @Override public IgfsFile update(IgfsPath path, Map<String, String> props) { - return null; - } - - /** {@inheritDoc} */ - @Override public void rename(IgfsPath src, IgfsPath dest) { - // No-op. - } - - /** {@inheritDoc} */ - @Override public boolean delete(IgfsPath path, boolean recursive) { - return false; - } - - /** {@inheritDoc} */ - @Override public void mkdirs(IgfsPath path) { - // No-op. - } - - /** {@inheritDoc} */ - @Override public void mkdirs(IgfsPath path, @Nullable Map<String, String> props) { - // No-op. - } - - /** {@inheritDoc} */ - @Override public Collection<IgfsPath> listPaths(IgfsPath path) { - return null; - } - - /** {@inheritDoc} */ - @Override public Collection<IgfsFile> listFiles(IgfsPath path) { - return null; - } - - /** {@inheritDoc} */ - @Override public long usedSpaceSize() { - return 0; - } - - /** {@inheritDoc} */ - @Override public Map<String, String> properties() { - return Collections.emptyMap(); - } - - /** {@inheritDoc} */ - @Override public IgfsOutputStream create(IgfsPath path, boolean overwrite) { - return null; - } - - /** {@inheritDoc} */ - @Override public IgfsOutputStream create(IgfsPath path, int bufSize, boolean overwrite, int replication, - long blockSize, @Nullable Map<String, String> props) { - return null; - } - - /** {@inheritDoc} */ - @Override public IgfsOutputStream create(IgfsPath path, int bufSize, boolean overwrite, - @Nullable IgniteUuid affKey, int replication, long blockSize, @Nullable Map<String, String> props) { - return null; - } - - /** {@inheritDoc} */ - @Override public IgfsOutputStream append(IgfsPath path, boolean create) { - return null; - } - - /** {@inheritDoc} */ - @Override public IgfsOutputStream append(IgfsPath path, int bufSize, boolean create, - @Nullable Map<String, String> props) { - return null; - } - - /** {@inheritDoc} */ - @Override public void setTimes(IgfsPath path, long accessTime, long modificationTime) { - // No-op. - } - - /** {@inheritDoc} */ - @Override public IgfsMetrics metrics() { - return null; - } - - /** {@inheritDoc} */ - @Override public void resetMetrics() { - // No-op. - } - - /** {@inheritDoc} */ - @Override public long size(IgfsPath path) { - return 0; - } - - /** {@inheritDoc} */ - @Override public void format() { - // No-op. - } - - /** {@inheritDoc} */ - @Override public <T, R> R execute(IgfsTask<T, R> task, @Nullable IgfsRecordResolver rslvr, - Collection<IgfsPath> paths, @Nullable T arg) { - return null; - } - - /** {@inheritDoc} */ - @Override public <T, R> R execute(IgfsTask<T, R> task, @Nullable IgfsRecordResolver rslvr, - Collection<IgfsPath> paths, boolean skipNonExistentFiles, long maxRangeLen, @Nullable T arg) { - return null; - } - - /** {@inheritDoc} */ - @Override public <T, R> R execute(Class<? extends IgfsTask<T, R>> taskCls, - @Nullable IgfsRecordResolver rslvr, Collection<IgfsPath> paths, @Nullable T arg) { - return null; - } - - /** {@inheritDoc} */ - @Override public <T, R> R execute(Class<? extends IgfsTask<T, R>> taskCls, - @Nullable IgfsRecordResolver rslvr, Collection<IgfsPath> paths, boolean skipNonExistentFiles, - long maxRangeLen, @Nullable T arg) { - return null; - } - - /** {@inheritDoc} */ - @Override public IgniteUuid nextAffinityKey() { - return null; - } - - /** {@inheritDoc} */ - @Override public IgniteFs withAsync() { - return null; - } - - /** {@inheritDoc} */ - @Override public boolean isAsync() { - return false; - } - - /** {@inheritDoc} */ - @Override public <R> IgniteFuture<R> future() { - return null; - } - } - - /** - * Mocked Grid. - */ - @SuppressWarnings("ExternalizableWithoutPublicNoArgConstructor") - private static class MockIgnite extends IgniteSpringBean implements IgniteEx { - /** {@inheritDoc} */ - @Override public IgniteClusterEx cluster() { - return (IgniteClusterEx)super.cluster(); - } - - /** {@inheritDoc} */ - @Override public IgniteFs igfsx(String name) { - assert F.eq("igfs", name); - - return IGFS; - } - - /** {@inheritDoc} */ - @Override public GridHadoop hadoop() { - return null; - } - - /** {@inheritDoc} */ - @Override public String name() { - return null; - } - - /** {@inheritDoc} */ - @Override public <K extends GridCacheUtilityKey, V> GridCacheProjectionEx<K, V> utilityCache(Class<K> keyCls, - Class<V> valCls) { - return null; - } - - /** {@inheritDoc} */ - @Nullable @Override public <K, V> GridCache<K, V> cachex(@Nullable String name) { - return null; - } - - /** {@inheritDoc} */ - @Nullable @Override public <K, V> GridCache<K, V> cachex() { - return null; - } - - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override public Collection<GridCache<?, ?>> cachesx(@Nullable IgnitePredicate<? super GridCache<?, ?>>... p) { - return null; - } - - /** {@inheritDoc} */ - @Override public boolean eventUserRecordable(int type) { - return false; - } - - /** {@inheritDoc} */ - @Override public boolean allEventsUserRecordable(int[] types) { - return false; - } - - /** {@inheritDoc} */ - @Override public Collection<String> compatibleVersions() { - return null; - } - - /** {@inheritDoc} */ - @Override public boolean isJmxRemoteEnabled() { - return false; - } - - /** {@inheritDoc} */ - @Override public boolean isRestartEnabled() { - return false; - } - - /** {@inheritDoc} */ - @Override public ClusterNode localNode() { - return null; - } - - /** {@inheritDoc} */ - @Override public String latestVersion() { - return null; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopFileSystemsTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopFileSystemsTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopFileSystemsTest.java deleted file mode 100644 index 18e5c03..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopFileSystemsTest.java +++ /dev/null @@ -1,177 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import org.apache.hadoop.conf.*; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.*; -import org.apache.hadoop.mapreduce.*; -import org.apache.ignite.internal.processors.hadoop.fs.*; -import org.apache.ignite.testframework.*; - -import java.io.*; -import java.net.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.*; - -/** - * Test file systems for the working directory multi-threading support. - */ -public class GridHadoopFileSystemsTest extends GridHadoopAbstractSelfTest { - private static final int THREAD_COUNT = 3; - - /** {@inheritDoc} */ - @Override protected void beforeTest() throws Exception { - startGrids(gridCount()); - } - - /** {@inheritDoc} */ - @Override protected void afterTest() throws Exception { - stopAllGrids(true); - } - - /** {@inheritDoc} */ - @Override protected boolean igfsEnabled() { - return true; - } - - /** {@inheritDoc} */ - @Override protected int gridCount() { - return 1; - } - - - /** - * Test the file system with specified URI for the multi-thread working directory support. - * - * @param uri Base URI of the file system (scheme and authority). - * @throws Exception If fails. - */ - private void testFileSystem(final URI uri) throws Exception { - final Configuration cfg = new Configuration(); - - setupFileSystems(cfg); - - cfg.set(GridHadoopFileSystemsUtils.LOC_FS_WORK_DIR_PROP, - new Path(new Path(uri), "user/" + System.getProperty("user.name")).toString()); - - final CountDownLatch changeUserPhase = new CountDownLatch(THREAD_COUNT); - final CountDownLatch changeDirPhase = new CountDownLatch(THREAD_COUNT); - final CountDownLatch changeAbsDirPhase = new CountDownLatch(THREAD_COUNT); - final CountDownLatch finishPhase = new CountDownLatch(THREAD_COUNT); - - final Path[] newUserInitWorkDir = new Path[THREAD_COUNT]; - final Path[] newWorkDir = new Path[THREAD_COUNT]; - final Path[] newAbsWorkDir = new Path[THREAD_COUNT]; - final Path[] newInstanceWorkDir = new Path[THREAD_COUNT]; - - final AtomicInteger threadNum = new AtomicInteger(0); - - GridTestUtils.runMultiThreadedAsync(new Runnable() { - @Override public void run() { - try { - int curThreadNum = threadNum.getAndIncrement(); - - FileSystem fs = FileSystem.get(uri, cfg); - - GridHadoopFileSystemsUtils.setUser(fs, "user" + curThreadNum); - - if ("file".equals(uri.getScheme())) - FileSystem.get(uri, cfg).setWorkingDirectory(new Path("file:///user/user" + curThreadNum)); - - changeUserPhase.countDown(); - changeUserPhase.await(); - - newUserInitWorkDir[curThreadNum] = FileSystem.get(uri, cfg).getWorkingDirectory(); - - FileSystem.get(uri, cfg).setWorkingDirectory(new Path("folder" + curThreadNum)); - - changeDirPhase.countDown(); - changeDirPhase.await(); - - newWorkDir[curThreadNum] = FileSystem.get(uri, cfg).getWorkingDirectory(); - - FileSystem.get(uri, cfg).setWorkingDirectory(new Path("/folder" + curThreadNum)); - - changeAbsDirPhase.countDown(); - changeAbsDirPhase.await(); - - newAbsWorkDir[curThreadNum] = FileSystem.get(uri, cfg).getWorkingDirectory(); - - newInstanceWorkDir[curThreadNum] = FileSystem.newInstance(uri, cfg).getWorkingDirectory(); - - finishPhase.countDown(); - } - catch (InterruptedException | IOException e) { - error("Failed to execute test thread.", e); - - fail(); - } - } - }, THREAD_COUNT, "filesystems-test"); - - finishPhase.await(); - - for (int i = 0; i < THREAD_COUNT; i ++) { - cfg.set(MRJobConfig.USER_NAME, "user" + i); - - Path workDir = new Path(new Path(uri), "user/user" + i); - - cfg.set(GridHadoopFileSystemsUtils.LOC_FS_WORK_DIR_PROP, workDir.toString()); - - assertEquals(workDir, FileSystem.newInstance(uri, cfg).getWorkingDirectory()); - - assertEquals(workDir, newUserInitWorkDir[i]); - - assertEquals(new Path(new Path(uri), "user/user" + i + "/folder" + i), newWorkDir[i]); - - assertEquals(new Path("/folder" + i), newAbsWorkDir[i]); - - assertEquals(new Path(new Path(uri), "user/" + System.getProperty("user.name")), newInstanceWorkDir[i]); - } - - System.out.println(System.getProperty("user.dir")); - } - - /** - * Test IGFS multi-thread working directory. - * - * @throws Exception If fails. - */ - public void testIgfs() throws Exception { - testFileSystem(URI.create(igfsScheme())); - } - - /** - * Test HDFS multi-thread working directory. - * - * @throws Exception If fails. - */ - public void testHdfs() throws Exception { - testFileSystem(URI.create("hdfs://localhost/")); - } - - /** - * Test LocalFS multi-thread working directory. - * - * @throws Exception If fails. - */ - public void testLocal() throws Exception { - testFileSystem(URI.create("file:///")); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopGroupingTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopGroupingTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopGroupingTest.java deleted file mode 100644 index 49099fc..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopGroupingTest.java +++ /dev/null @@ -1,286 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import org.apache.hadoop.io.*; -import org.apache.hadoop.mapreduce.*; -import org.apache.ignite.internal.util.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; - -import java.io.*; -import java.util.*; - -import static org.apache.ignite.internal.processors.hadoop.GridHadoopUtils.*; - -/** - * Grouping test. - */ -public class GridHadoopGroupingTest extends GridHadoopAbstractSelfTest { - /** */ - private static final String PATH_OUTPUT = "/test-out"; - - /** */ - private static final GridConcurrentHashSet<UUID> vals = GridHadoopSharedMap.map(GridHadoopGroupingTest.class) - .put("vals", new GridConcurrentHashSet<UUID>()); - - /** {@inheritDoc} */ - @Override protected int gridCount() { - return 3; - } - - /** {@inheritDoc} */ - protected boolean igfsEnabled() { - return false; - } - - /** {@inheritDoc} */ - @Override protected void beforeTest() throws Exception { - startGrids(gridCount()); - } - - /** {@inheritDoc} */ - @Override protected void afterTest() throws Exception { - stopAllGrids(true); - } - - /** {@inheritDoc} */ - @Override public GridHadoopConfiguration hadoopConfiguration(String gridName) { - GridHadoopConfiguration cfg = super.hadoopConfiguration(gridName); - - cfg.setExternalExecution(false); - - return cfg; - } - - /** - * @throws Exception If failed. - */ - public void testGroupingReducer() throws Exception { - doTestGrouping(false); - } - - /** - * @throws Exception If failed. - */ - public void testGroupingCombiner() throws Exception { - doTestGrouping(true); - } - - /** - * @param combiner With combiner. - * @throws Exception If failed. - */ - public void doTestGrouping(boolean combiner) throws Exception { - vals.clear(); - - Job job = Job.getInstance(); - - job.setInputFormatClass(InFormat.class); - job.setOutputFormatClass(OutFormat.class); - - job.setOutputKeyClass(YearTemperature.class); - job.setOutputValueClass(Text.class); - - job.setMapperClass(Mapper.class); - - if (combiner) { - job.setCombinerClass(MyReducer.class); - job.setNumReduceTasks(0); - job.setCombinerKeyGroupingComparatorClass(YearComparator.class); - } - else { - job.setReducerClass(MyReducer.class); - job.setNumReduceTasks(4); - job.setGroupingComparatorClass(YearComparator.class); - } - - grid(0).hadoop().submit(new GridHadoopJobId(UUID.randomUUID(), 2), - createJobInfo(job.getConfiguration())).get(30000); - - assertTrue(vals.isEmpty()); - } - - public static class MyReducer extends Reducer<YearTemperature, Text, Text, Object> { - /** */ - int lastYear; - - @Override protected void reduce(YearTemperature key, Iterable<Text> vals0, Context context) - throws IOException, InterruptedException { - X.println("___ : " + context.getTaskAttemptID() + " --> " + key); - - Set<UUID> ids = new HashSet<>(); - - for (Text val : vals0) - assertTrue(ids.add(UUID.fromString(val.toString()))); - - for (Text val : vals0) - assertTrue(ids.remove(UUID.fromString(val.toString()))); - - assertTrue(ids.isEmpty()); - - assertTrue(key.year > lastYear); - - lastYear = key.year; - - for (Text val : vals0) - assertTrue(vals.remove(UUID.fromString(val.toString()))); - } - } - - public static class YearComparator implements RawComparator<YearTemperature> { // Grouping comparator. - /** {@inheritDoc} */ - @Override public int compare(YearTemperature o1, YearTemperature o2) { - return Integer.compare(o1.year, o2.year); - } - - /** {@inheritDoc} */ - @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { - throw new IllegalStateException(); - } - } - - public static class YearTemperature implements WritableComparable<YearTemperature>, Cloneable { - /** */ - private int year; - - /** */ - private int temperature; - - /** {@inheritDoc} */ - @Override public void write(DataOutput out) throws IOException { - out.writeInt(year); - out.writeInt(temperature); - } - - /** {@inheritDoc} */ - @Override public void readFields(DataInput in) throws IOException { - year = in.readInt(); - temperature = in.readInt(); - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object o) { - throw new IllegalStateException(); - } - - /** {@inheritDoc} */ - @Override public int hashCode() { // To be partitioned by year. - return year; - } - - /** {@inheritDoc} */ - @Override public int compareTo(YearTemperature o) { - int res = Integer.compare(year, o.year); - - if (res != 0) - return res; - - // Sort comparator by year and temperature, to find max for year. - return Integer.compare(o.temperature, temperature); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(YearTemperature.class, this); - } - } - - public static class InFormat extends InputFormat<YearTemperature, Text> { - /** {@inheritDoc} */ - @Override public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException { - ArrayList<InputSplit> list = new ArrayList<>(); - - for (int i = 0; i < 10; i++) - list.add(new GridHadoopSortingTest.FakeSplit(20)); - - return list; - } - - /** {@inheritDoc} */ - @Override public RecordReader<YearTemperature, Text> createRecordReader(final InputSplit split, - TaskAttemptContext context) throws IOException, InterruptedException { - return new RecordReader<YearTemperature, Text>() { - /** */ - int cnt; - - /** */ - Random rnd = new GridRandom(); - - /** */ - YearTemperature key = new YearTemperature(); - - /** */ - Text val = new Text(); - - @Override public void initialize(InputSplit split, TaskAttemptContext context) { - // No-op. - } - - @Override public boolean nextKeyValue() throws IOException, InterruptedException { - return cnt++ < split.getLength(); - } - - @Override public YearTemperature getCurrentKey() { - key.year = 1990 + rnd.nextInt(10); - key.temperature = 10 + rnd.nextInt(20); - - return key; - } - - @Override public Text getCurrentValue() { - UUID id = UUID.randomUUID(); - - assertTrue(vals.add(id)); - - val.set(id.toString()); - - return val; - } - - @Override public float getProgress() { - return 0; - } - - @Override public void close() { - // No-op. - } - }; - } - } - - /** - * - */ - public static class OutFormat extends OutputFormat { - /** {@inheritDoc} */ - @Override public RecordWriter getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException { - return null; - } - - /** {@inheritDoc} */ - @Override public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException { - // No-op. - } - - /** {@inheritDoc} */ - @Override public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException { - return null; - } - } -}