Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-218 [created] bfebbaf77


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1634a685/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
index 6f6bfa1..01d4719 100644
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.hadoop.v2;
 
+import org.apache.hadoop.conf.*;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.fs.Path;
@@ -24,6 +25,7 @@ import org.apache.hadoop.mapred.*;
 import org.apache.hadoop.mapreduce.*;
 import org.apache.hadoop.util.*;
 import org.apache.ignite.*;
+import org.apache.ignite.hadoop.fs.v1.*;
 import org.apache.ignite.internal.processors.hadoop.*;
 import org.apache.ignite.internal.processors.hadoop.fs.*;
 import org.apache.ignite.internal.util.typedef.*;
@@ -57,6 +59,8 @@ public class HadoopV2JobResourceManager {
 
     /** Staging directory to delivery job jar and config to the work nodes. */
     private Path stagingDir;
+//
+//    private FileSystem fs;
 
     /**
      * Creates new instance.
@@ -68,6 +72,10 @@ public class HadoopV2JobResourceManager {
         this.jobId = jobId;
         this.ctx = ctx;
         this.log = log.getLogger(HadoopV2JobResourceManager.class);
+//
+//        assert fs != null;
+//
+//        this.fs = fs;
     }
 
     /**
@@ -93,6 +101,40 @@ public class HadoopV2JobResourceManager {
     }
 
     /**
+     * Common method to get the V1 file system in MapRed engine.
+     * It creates the filesystem for the user specified in the
+     * configuration with {@link MRJobConfig#USER_NAME} property.
+     * @param uri the file system uri.
+     * @param cfg the configuration.
+     * @return the file system
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    public static FileSystem fileSystemForUser(@Nullable URI uri, @Nullable 
Configuration cfg) throws IOException {
+        final String user = IgniteHadoopFileSystem.getHadoopUser(cfg);
+
+        assert user != null;
+
+        if (uri == null)
+            uri = FileSystem.getDefaultUri(cfg);
+
+        final FileSystem fs;
+        try {
+            fs = FileSystem.get(uri, cfg, user);
+        } catch (InterruptedException ie) {
+            throw new IOException(ie);
+        }
+
+        assert fs != null;
+
+        if (fs instanceof IgniteHadoopFileSystem)
+            //noinspection StringEquality
+            assert user == ((IgniteHadoopFileSystem)fs).user();
+
+        return fs;
+    }
+
+    /**
      * Prepare job resources. Resolve the classpath list and download it if 
needed.
      *
      * @param download {@code true} If need to download resources.
@@ -112,15 +154,15 @@ public class HadoopV2JobResourceManager {
                 stagingDir = new Path(new URI(mrDir));
 
                 if (download) {
-                    FileSystem fs = FileSystem.get(stagingDir.toUri(), cfg);
-
-                    if (!fs.exists(stagingDir))
-                        throw new IgniteCheckedException("Failed to find 
map-reduce submission directory (does not exist): " +
-                            stagingDir);
-
-                    if (!FileUtil.copy(fs, stagingDir, jobLocDir, false, cfg))
-                        throw new IgniteCheckedException("Failed to copy job 
submission directory contents to local file system " +
-                            "[path=" + stagingDir + ", locDir=" + 
jobLocDir.getAbsolutePath() + ", jobId=" + jobId + ']');
+                    try (FileSystem fs = fileSystemForUser(stagingDir.toUri(), 
cfg)) {
+                        if (!fs.exists(stagingDir))
+                            throw new IgniteCheckedException("Failed to find 
map-reduce submission directory (does not exist): " +
+                                stagingDir);
+
+                        if (!FileUtil.copy(fs, stagingDir, jobLocDir, false, 
cfg))
+                            throw new IgniteCheckedException("Failed to copy 
job submission directory contents to local file system " +
+                                "[path=" + stagingDir + ", locDir=" + 
jobLocDir.getAbsolutePath() + ", jobId=" + jobId + ']');
+                    }
                 }
 
                 File jarJobFile = new File(jobLocDir, "job.jar");
@@ -204,34 +246,34 @@ public class HadoopV2JobResourceManager {
 
             FileSystem dstFs = FileSystem.getLocal(cfg);
 
-            FileSystem srcFs = srcPath.getFileSystem(cfg);
-
-            if (extract) {
-                File archivesPath = new File(jobLocDir.getAbsolutePath(), 
".cached-archives");
+            try (FileSystem srcFs = fileSystemForUser(srcPath.toUri(), cfg)) {
+                if (extract) {
+                    File archivesPath = new File(jobLocDir.getAbsolutePath(), 
".cached-archives");
 
-                if (!archivesPath.exists() && !archivesPath.mkdir())
-                    throw new IOException("Failed to create directory " +
-                         "[path=" + archivesPath + ", jobId=" + jobId + ']');
+                    if (!archivesPath.exists() && !archivesPath.mkdir())
+                        throw new IOException("Failed to create directory " +
+                            "[path=" + archivesPath + ", jobId=" + jobId + 
']');
 
-                File archiveFile = new File(archivesPath, locName);
+                    File archiveFile = new File(archivesPath, locName);
 
-                FileUtil.copy(srcFs, srcPath, dstFs, new 
Path(archiveFile.toString()), false, cfg);
+                    FileUtil.copy(srcFs, srcPath, dstFs, new 
Path(archiveFile.toString()), false, cfg);
 
-                String archiveNameLC = archiveFile.getName().toLowerCase();
+                    String archiveNameLC = archiveFile.getName().toLowerCase();
 
-                if (archiveNameLC.endsWith(".jar"))
-                    RunJar.unJar(archiveFile, dstPath);
-                else if (archiveNameLC.endsWith(".zip"))
-                    FileUtil.unZip(archiveFile, dstPath);
-                else if (archiveNameLC.endsWith(".tar.gz") ||
-                    archiveNameLC.endsWith(".tgz") ||
-                    archiveNameLC.endsWith(".tar"))
-                    FileUtil.unTar(archiveFile, dstPath);
+                    if (archiveNameLC.endsWith(".jar"))
+                        RunJar.unJar(archiveFile, dstPath);
+                    else if (archiveNameLC.endsWith(".zip"))
+                        FileUtil.unZip(archiveFile, dstPath);
+                    else if (archiveNameLC.endsWith(".tar.gz") ||
+                        archiveNameLC.endsWith(".tgz") ||
+                        archiveNameLC.endsWith(".tar"))
+                        FileUtil.unTar(archiveFile, dstPath);
+                    else
+                        throw new IOException("Cannot unpack archive [path=" + 
srcPath + ", jobId=" + jobId + ']');
+                }
                 else
-                    throw new IOException("Cannot unpack archive [path=" + 
srcPath + ", jobId=" + jobId + ']');
+                    FileUtil.copy(srcFs, srcPath, dstFs, new 
Path(dstPath.toString()), false, cfg);
             }
-            else
-                FileUtil.copy(srcFs, srcPath, dstFs, new 
Path(dstPath.toString()), false, cfg);
         }
 
         if (!res.isEmpty() && rsrcNameProp != null)
@@ -286,8 +328,11 @@ public class HadoopV2JobResourceManager {
      */
     public void cleanupStagingDirectory() {
         try {
-            if (stagingDir != null)
-                stagingDir.getFileSystem(ctx.getJobConf()).delete(stagingDir, 
true);
+            if (stagingDir != null) {
+                try (FileSystem fs = fileSystemForUser(stagingDir.toUri(), 
ctx.getJobConf())) {
+                    fs.delete(stagingDir, true);
+                }
+            }
         }
         catch (Exception e) {
             log.error("Failed to remove job staging directory [path=" + 
stagingDir + ", jobId=" + jobId + ']' , e);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1634a685/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
index 24f10a6..de7ff7f 100644
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
@@ -233,9 +233,10 @@ public class HadoopV2TaskContext extends HadoopTaskContext 
{
         
Thread.currentThread().setContextClassLoader(jobConf().getClassLoader());
 
         try {
-            FileSystem fs = FileSystem.get(jobConf());
-
-            HadoopFileSystemsUtils.setUser(fs, jobConf().getUser());
+            //FileSystem fs = 
HadoopV2JobResourceManager.fileSystemForUser(null, jobConf());
+            //String user = jobConf().getUser();
+            //System.out.println("Setting user ["+user+"] to fs=" + fs + ", 
thread = " + Thread.currentThread());
+            //HadoopFileSystemsUtils.setUser(fs, user); //
 
             LocalFileSystem locFs = FileSystem.getLocal(jobConf());
 
@@ -412,7 +413,7 @@ public class HadoopV2TaskContext extends HadoopTaskContext {
     private Object readExternalSplit(HadoopExternalSplit split) throws 
IgniteCheckedException {
         Path jobDir = new Path(jobConf().get(MRJobConfig.MAPREDUCE_JOB_DIR));
 
-        try (FileSystem fs = FileSystem.get(jobDir.toUri(), jobConf());
+        try (FileSystem fs = 
HadoopV2JobResourceManager.fileSystemForUser(jobDir.toUri(), jobConf());
             FSDataInputStream in = 
fs.open(JobSubmissionFiles.getJobSplitFile(jobDir))) {
 
             in.seek(split.offset());

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1634a685/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemClientSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemClientSelfTest.java
 
b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemClientSelfTest.java
index b92b213..fcfd587 100644
--- 
a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemClientSelfTest.java
+++ 
b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemClientSelfTest.java
@@ -125,7 +125,7 @@ public class IgniteHadoopFileSystemClientSelfTest extends 
IgfsCommonAbstractTest
         try {
             switchHandlerErrorFlag(true);
 
-            HadoopIgfs client = new HadoopIgfsOutProc("127.0.0.1", 10500, 
getTestGridName(0), "igfs", LOG);
+            HadoopIgfs client = new HadoopIgfsOutProc("127.0.0.1", 10500, 
getTestGridName(0), "igfs", LOG, null);
 
             client.handshake(null);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1634a685/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java
 
b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java
index 1ff8a0f..bcf6194 100644
--- 
a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java
+++ 
b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java
@@ -916,6 +916,16 @@ public class HadoopDefaultMapReducePlannerSelfTest extends 
HadoopAbstractSelfTes
         @Override public IgfsSecondaryFileSystem asSecondary() {
             return null;
         }
+
+        /** {@inheritDoc} */
+        @Override public IgfsEx forUser(String userName) throws 
IgniteCheckedException {
+            return this;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String user() {
+            return null;
+        }
     }
 
     /**
@@ -1000,5 +1010,7 @@ public class HadoopDefaultMapReducePlannerSelfTest 
extends HadoopAbstractSelfTes
         @Override public GridKernalContext context() {
             return null;
         }
+
+
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1634a685/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopFileSystemsTest.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopFileSystemsTest.java
 
b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopFileSystemsTest.java
index 8cf31a2..470542c 100644
--- 
a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopFileSystemsTest.java
+++ 
b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopFileSystemsTest.java
@@ -89,7 +89,7 @@ public class HadoopFileSystemsTest extends 
HadoopAbstractSelfTest {
 
                     FileSystem fs = FileSystem.get(uri, cfg);
 
-                    HadoopFileSystemsUtils.setUser(fs, "user" + curThreadNum);
+                    //HadoopFileSystemsUtils.setUser(fs, "user" + 
curThreadNum);
 
                     if ("file".equals(uri.getScheme()))
                         FileSystem.get(uri, cfg).setWorkingDirectory(new 
Path("file:///user/user" + curThreadNum));

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1634a685/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopStartup.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopStartup.java
 
b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopStartup.java
index 1a93223..e545ca9 100644
--- 
a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopStartup.java
+++ 
b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopStartup.java
@@ -39,7 +39,7 @@ public class HadoopStartup {
     public static Configuration configuration() {
         Configuration cfg = new Configuration();
 
-        cfg.set("fs.defaultFS", "igfs://igfs@localhost");
+        cfg.set("fs.defaultFS", "igfs://igfs@localhost:10500");
 
         cfg.set("fs.igfs.impl", 
org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem.class.getName());
         cfg.set("fs.AbstractFileSystem.igfs.impl", 
IgniteHadoopFileSystem.class.getName());

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1634a685/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2.java
 
b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2.java
index dc68df7..8637a4e 100644
--- 
a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2.java
+++ 
b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2.java
@@ -17,11 +17,13 @@
 
 package org.apache.ignite.internal.processors.hadoop.examples;
 
+import org.apache.hadoop.conf.*;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.mapreduce.*;
 import org.apache.hadoop.mapreduce.lib.input.*;
 import org.apache.hadoop.mapreduce.lib.output.*;
+import org.apache.ignite.internal.processors.hadoop.*;
 
 import java.io.*;
 
@@ -44,6 +46,24 @@ public class HadoopWordCount2 {
         Job job = getJob(args[0], args[1]);
 
         job.submit();
+
+        printCounters(job);
+
+        job.waitForCompletion(true);
+
+        printCounters(job);
+    }
+
+    private static void printCounters(Job job) throws IOException {
+        Counters counters = job.getCounters();
+
+        for (CounterGroup group : counters) {
+            System.out.println("Group: " + group.getDisplayName() + "," + 
group.getName());
+            System.out.println("  number of counters: " + group.size());
+            for (Counter counter : group) {
+                System.out.println("  - " + counter.getDisplayName() + ": " + 
counter.getName() + ": "+counter.getValue());
+            }
+        }
     }
 
     /**
@@ -55,7 +75,9 @@ public class HadoopWordCount2 {
      * @throws IOException If fails.
      */
     public static Job getJob(String input, String output) throws IOException {
-        Job job = Job.getInstance();
+        Configuration cfg = HadoopStartup.configuration();
+
+        Job job = Job.getInstance(cfg);
 
         job.setOutputKeyClass(Text.class);
         job.setOutputValueClass(IntWritable.class);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1634a685/parent/pom.xml
----------------------------------------------------------------------
diff --git a/parent/pom.xml b/parent/pom.xml
index 661b310..cb84f7f 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -35,7 +35,7 @@
 
     <properties>
         <ignite.edition>fabric</ignite.edition>
-        <hadoop.version>2.4.1</hadoop.version>
+        <hadoop.version>2.6.0</hadoop.version>
         <spring.version>4.1.0.RELEASE</spring.version>
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
         <maven.build.timestamp.format>MMMM d 
yyyy</maven.build.timestamp.format>

Reply via email to