Repository: incubator-ignite Updated Branches: refs/heads/ignite-218 [created] bfebbaf77
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1634a685/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java index 6f6bfa1..01d4719 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.hadoop.v2; +import org.apache.hadoop.conf.*; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.*; import org.apache.hadoop.fs.Path; @@ -24,6 +25,7 @@ import org.apache.hadoop.mapred.*; import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.util.*; import org.apache.ignite.*; +import org.apache.ignite.hadoop.fs.v1.*; import org.apache.ignite.internal.processors.hadoop.*; import org.apache.ignite.internal.processors.hadoop.fs.*; import org.apache.ignite.internal.util.typedef.*; @@ -57,6 +59,8 @@ public class HadoopV2JobResourceManager { /** Staging directory to delivery job jar and config to the work nodes. */ private Path stagingDir; +// +// private FileSystem fs; /** * Creates new instance. @@ -68,6 +72,10 @@ public class HadoopV2JobResourceManager { this.jobId = jobId; this.ctx = ctx; this.log = log.getLogger(HadoopV2JobResourceManager.class); +// +// assert fs != null; +// +// this.fs = fs; } /** @@ -93,6 +101,40 @@ public class HadoopV2JobResourceManager { } /** + * Common method to get the V1 file system in MapRed engine. + * It creates the filesystem for the user specified in the + * configuration with {@link MRJobConfig#USER_NAME} property. + * @param uri the file system uri. + * @param cfg the configuration. + * @return the file system + * @throws IOException + * @throws InterruptedException + */ + public static FileSystem fileSystemForUser(@Nullable URI uri, @Nullable Configuration cfg) throws IOException { + final String user = IgniteHadoopFileSystem.getHadoopUser(cfg); + + assert user != null; + + if (uri == null) + uri = FileSystem.getDefaultUri(cfg); + + final FileSystem fs; + try { + fs = FileSystem.get(uri, cfg, user); + } catch (InterruptedException ie) { + throw new IOException(ie); + } + + assert fs != null; + + if (fs instanceof IgniteHadoopFileSystem) + //noinspection StringEquality + assert user == ((IgniteHadoopFileSystem)fs).user(); + + return fs; + } + + /** * Prepare job resources. Resolve the classpath list and download it if needed. * * @param download {@code true} If need to download resources. @@ -112,15 +154,15 @@ public class HadoopV2JobResourceManager { stagingDir = new Path(new URI(mrDir)); if (download) { - FileSystem fs = FileSystem.get(stagingDir.toUri(), cfg); - - if (!fs.exists(stagingDir)) - throw new IgniteCheckedException("Failed to find map-reduce submission directory (does not exist): " + - stagingDir); - - if (!FileUtil.copy(fs, stagingDir, jobLocDir, false, cfg)) - throw new IgniteCheckedException("Failed to copy job submission directory contents to local file system " + - "[path=" + stagingDir + ", locDir=" + jobLocDir.getAbsolutePath() + ", jobId=" + jobId + ']'); + try (FileSystem fs = fileSystemForUser(stagingDir.toUri(), cfg)) { + if (!fs.exists(stagingDir)) + throw new IgniteCheckedException("Failed to find map-reduce submission directory (does not exist): " + + stagingDir); + + if (!FileUtil.copy(fs, stagingDir, jobLocDir, false, cfg)) + throw new IgniteCheckedException("Failed to copy job submission directory contents to local file system " + + "[path=" + stagingDir + ", locDir=" + jobLocDir.getAbsolutePath() + ", jobId=" + jobId + ']'); + } } File jarJobFile = new File(jobLocDir, "job.jar"); @@ -204,34 +246,34 @@ public class HadoopV2JobResourceManager { FileSystem dstFs = FileSystem.getLocal(cfg); - FileSystem srcFs = srcPath.getFileSystem(cfg); - - if (extract) { - File archivesPath = new File(jobLocDir.getAbsolutePath(), ".cached-archives"); + try (FileSystem srcFs = fileSystemForUser(srcPath.toUri(), cfg)) { + if (extract) { + File archivesPath = new File(jobLocDir.getAbsolutePath(), ".cached-archives"); - if (!archivesPath.exists() && !archivesPath.mkdir()) - throw new IOException("Failed to create directory " + - "[path=" + archivesPath + ", jobId=" + jobId + ']'); + if (!archivesPath.exists() && !archivesPath.mkdir()) + throw new IOException("Failed to create directory " + + "[path=" + archivesPath + ", jobId=" + jobId + ']'); - File archiveFile = new File(archivesPath, locName); + File archiveFile = new File(archivesPath, locName); - FileUtil.copy(srcFs, srcPath, dstFs, new Path(archiveFile.toString()), false, cfg); + FileUtil.copy(srcFs, srcPath, dstFs, new Path(archiveFile.toString()), false, cfg); - String archiveNameLC = archiveFile.getName().toLowerCase(); + String archiveNameLC = archiveFile.getName().toLowerCase(); - if (archiveNameLC.endsWith(".jar")) - RunJar.unJar(archiveFile, dstPath); - else if (archiveNameLC.endsWith(".zip")) - FileUtil.unZip(archiveFile, dstPath); - else if (archiveNameLC.endsWith(".tar.gz") || - archiveNameLC.endsWith(".tgz") || - archiveNameLC.endsWith(".tar")) - FileUtil.unTar(archiveFile, dstPath); + if (archiveNameLC.endsWith(".jar")) + RunJar.unJar(archiveFile, dstPath); + else if (archiveNameLC.endsWith(".zip")) + FileUtil.unZip(archiveFile, dstPath); + else if (archiveNameLC.endsWith(".tar.gz") || + archiveNameLC.endsWith(".tgz") || + archiveNameLC.endsWith(".tar")) + FileUtil.unTar(archiveFile, dstPath); + else + throw new IOException("Cannot unpack archive [path=" + srcPath + ", jobId=" + jobId + ']'); + } else - throw new IOException("Cannot unpack archive [path=" + srcPath + ", jobId=" + jobId + ']'); + FileUtil.copy(srcFs, srcPath, dstFs, new Path(dstPath.toString()), false, cfg); } - else - FileUtil.copy(srcFs, srcPath, dstFs, new Path(dstPath.toString()), false, cfg); } if (!res.isEmpty() && rsrcNameProp != null) @@ -286,8 +328,11 @@ public class HadoopV2JobResourceManager { */ public void cleanupStagingDirectory() { try { - if (stagingDir != null) - stagingDir.getFileSystem(ctx.getJobConf()).delete(stagingDir, true); + if (stagingDir != null) { + try (FileSystem fs = fileSystemForUser(stagingDir.toUri(), ctx.getJobConf())) { + fs.delete(stagingDir, true); + } + } } catch (Exception e) { log.error("Failed to remove job staging directory [path=" + stagingDir + ", jobId=" + jobId + ']' , e); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1634a685/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java index 24f10a6..de7ff7f 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java @@ -233,9 +233,10 @@ public class HadoopV2TaskContext extends HadoopTaskContext { Thread.currentThread().setContextClassLoader(jobConf().getClassLoader()); try { - FileSystem fs = FileSystem.get(jobConf()); - - HadoopFileSystemsUtils.setUser(fs, jobConf().getUser()); + //FileSystem fs = HadoopV2JobResourceManager.fileSystemForUser(null, jobConf()); + //String user = jobConf().getUser(); + //System.out.println("Setting user ["+user+"] to fs=" + fs + ", thread = " + Thread.currentThread()); + //HadoopFileSystemsUtils.setUser(fs, user); // LocalFileSystem locFs = FileSystem.getLocal(jobConf()); @@ -412,7 +413,7 @@ public class HadoopV2TaskContext extends HadoopTaskContext { private Object readExternalSplit(HadoopExternalSplit split) throws IgniteCheckedException { Path jobDir = new Path(jobConf().get(MRJobConfig.MAPREDUCE_JOB_DIR)); - try (FileSystem fs = FileSystem.get(jobDir.toUri(), jobConf()); + try (FileSystem fs = HadoopV2JobResourceManager.fileSystemForUser(jobDir.toUri(), jobConf()); FSDataInputStream in = fs.open(JobSubmissionFiles.getJobSplitFile(jobDir))) { in.seek(split.offset()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1634a685/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemClientSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemClientSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemClientSelfTest.java index b92b213..fcfd587 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemClientSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemClientSelfTest.java @@ -125,7 +125,7 @@ public class IgniteHadoopFileSystemClientSelfTest extends IgfsCommonAbstractTest try { switchHandlerErrorFlag(true); - HadoopIgfs client = new HadoopIgfsOutProc("127.0.0.1", 10500, getTestGridName(0), "igfs", LOG); + HadoopIgfs client = new HadoopIgfsOutProc("127.0.0.1", 10500, getTestGridName(0), "igfs", LOG, null); client.handshake(null); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1634a685/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java index 1ff8a0f..bcf6194 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java @@ -916,6 +916,16 @@ public class HadoopDefaultMapReducePlannerSelfTest extends HadoopAbstractSelfTes @Override public IgfsSecondaryFileSystem asSecondary() { return null; } + + /** {@inheritDoc} */ + @Override public IgfsEx forUser(String userName) throws IgniteCheckedException { + return this; + } + + /** {@inheritDoc} */ + @Override public String user() { + return null; + } } /** @@ -1000,5 +1010,7 @@ public class HadoopDefaultMapReducePlannerSelfTest extends HadoopAbstractSelfTes @Override public GridKernalContext context() { return null; } + + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1634a685/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopFileSystemsTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopFileSystemsTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopFileSystemsTest.java index 8cf31a2..470542c 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopFileSystemsTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopFileSystemsTest.java @@ -89,7 +89,7 @@ public class HadoopFileSystemsTest extends HadoopAbstractSelfTest { FileSystem fs = FileSystem.get(uri, cfg); - HadoopFileSystemsUtils.setUser(fs, "user" + curThreadNum); + //HadoopFileSystemsUtils.setUser(fs, "user" + curThreadNum); if ("file".equals(uri.getScheme())) FileSystem.get(uri, cfg).setWorkingDirectory(new Path("file:///user/user" + curThreadNum)); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1634a685/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopStartup.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopStartup.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopStartup.java index 1a93223..e545ca9 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopStartup.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopStartup.java @@ -39,7 +39,7 @@ public class HadoopStartup { public static Configuration configuration() { Configuration cfg = new Configuration(); - cfg.set("fs.defaultFS", "igfs://igfs@localhost"); + cfg.set("fs.defaultFS", "igfs://igfs@localhost:10500"); cfg.set("fs.igfs.impl", org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem.class.getName()); cfg.set("fs.AbstractFileSystem.igfs.impl", IgniteHadoopFileSystem.class.getName()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1634a685/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2.java index dc68df7..8637a4e 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2.java @@ -17,11 +17,13 @@ package org.apache.ignite.internal.processors.hadoop.examples; +import org.apache.hadoop.conf.*; import org.apache.hadoop.fs.*; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.lib.input.*; import org.apache.hadoop.mapreduce.lib.output.*; +import org.apache.ignite.internal.processors.hadoop.*; import java.io.*; @@ -44,6 +46,24 @@ public class HadoopWordCount2 { Job job = getJob(args[0], args[1]); job.submit(); + + printCounters(job); + + job.waitForCompletion(true); + + printCounters(job); + } + + private static void printCounters(Job job) throws IOException { + Counters counters = job.getCounters(); + + for (CounterGroup group : counters) { + System.out.println("Group: " + group.getDisplayName() + "," + group.getName()); + System.out.println(" number of counters: " + group.size()); + for (Counter counter : group) { + System.out.println(" - " + counter.getDisplayName() + ": " + counter.getName() + ": "+counter.getValue()); + } + } } /** @@ -55,7 +75,9 @@ public class HadoopWordCount2 { * @throws IOException If fails. */ public static Job getJob(String input, String output) throws IOException { - Job job = Job.getInstance(); + Configuration cfg = HadoopStartup.configuration(); + + Job job = Job.getInstance(cfg); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1634a685/parent/pom.xml ---------------------------------------------------------------------- diff --git a/parent/pom.xml b/parent/pom.xml index 661b310..cb84f7f 100644 --- a/parent/pom.xml +++ b/parent/pom.xml @@ -35,7 +35,7 @@ <properties> <ignite.edition>fabric</ignite.edition> - <hadoop.version>2.4.1</hadoop.version> + <hadoop.version>2.6.0</hadoop.version> <spring.version>4.1.0.RELEASE</spring.version> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.build.timestamp.format>MMMM d yyyy</maven.build.timestamp.format>