Repository: incubator-ignite Updated Branches: refs/heads/ignite-218 65132a6cb -> 27ad40871
[IGNITE-218]: intermediate commit. Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/7db58f9e Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/7db58f9e Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/7db58f9e Branch: refs/heads/ignite-218 Commit: 7db58f9ec1c021ad38df7bdf879605f7e9babfec Parents: 65132a6 Author: iveselovskiy <iveselovs...@gridgain.com> Authored: Thu Apr 16 22:01:11 2015 +0300 Committer: iveselovskiy <iveselovs...@gridgain.com> Committed: Thu Apr 16 22:01:11 2015 +0300 ---------------------------------------------------------------------- .../processors/igfs/IgfsIpcHandler.java | 185 ++++++++--------- .../fs/IgniteHadoopFileSystemCounterWriter.java | 18 +- .../hadoop/fs/v1/IgniteHadoopFileSystem.java | 51 ++--- .../hadoop/fs/v2/IgniteHadoopFileSystem.java | 5 +- .../hadoop/igfs/HadoopIgfsInProc.java | 196 +++++++++++++------ .../hadoop/igfs/HadoopIgfsOutProc.java | 2 +- .../hadoop/igfs/HadoopIgfsWrapper.java | 1 + .../hadoop/taskexecutor/HadoopRunnableTask.java | 16 +- .../processors/hadoop/v2/HadoopV2Job.java | 23 +-- .../hadoop/v2/HadoopV2JobResourceManager.java | 33 ++-- .../hadoop/v2/HadoopV2TaskContext.java | 8 +- parent/pom.xml | 2 +- 12 files changed, 293 insertions(+), 247 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7db58f9e/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java index d8a8bdf..3ba99fc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java @@ -31,6 +31,7 @@ import org.jetbrains.annotations.*; import java.io.*; import java.util.*; +import java.util.concurrent.*; import java.util.concurrent.atomic.*; /** @@ -241,153 +242,155 @@ class IgfsIpcHandler implements IgfsServerHandler { * @return Response message. * @throws IgniteCheckedException If failed. */ - private IgfsMessage processPathControlRequest(IgfsClientSession ses, IgfsIpcCommand cmd, + private IgfsMessage processPathControlRequest(final IgfsClientSession ses, final IgfsIpcCommand cmd, IgfsMessage msg) throws IgniteCheckedException { - IgfsPathControlRequest req = (IgfsPathControlRequest)msg; + final IgfsPathControlRequest req = (IgfsPathControlRequest)msg; if (log.isDebugEnabled()) log.debug("Processing path control request [igfsName=" + igfs.name() + ", req=" + req + ']'); - IgfsControlResponse res = new IgfsControlResponse(); + final IgfsControlResponse res = new IgfsControlResponse(); final String userName = req.userName(); assert userName != null; - final IgfsEx userIgfs = igfs; //.forUser(userName); - - //IgfsUtils.setContextUser(userName); - try { - switch (cmd) { - case EXISTS: - res.response(userIgfs.exists(req.path())); + IgfsUserContext.doAs(userName, new Callable<Void>() { + @Override public Void call() throws Exception { + switch (cmd) { + case EXISTS: + res.response(igfs.exists(req.path())); - break; + break; - case INFO: - res.response(userIgfs.info(req.path())); + case INFO: + res.response(igfs.info(req.path())); - break; + break; - case PATH_SUMMARY: - res.response(userIgfs.summary(req.path())); + case PATH_SUMMARY: + res.response(igfs.summary(req.path())); - break; + break; - case UPDATE: - res.response(userIgfs.update(req.path(), req.properties())); + case UPDATE: + res.response(igfs.update(req.path(), req.properties())); - break; + break; - case RENAME: - userIgfs.rename(req.path(), req.destinationPath()); + case RENAME: + igfs.rename(req.path(), req.destinationPath()); - res.response(true); + res.response(true); - break; + break; - case DELETE: - res.response(userIgfs.delete(req.path(), req.flag())); + case DELETE: + res.response(igfs.delete(req.path(), req.flag())); - break; + break; - case MAKE_DIRECTORIES: - userIgfs.mkdirs(req.path(), req.properties()); + case MAKE_DIRECTORIES: + igfs.mkdirs(req.path(), req.properties()); - res.response(true); + res.response(true); - break; + break; - case LIST_PATHS: - res.paths(userIgfs.listPaths(req.path())); + case LIST_PATHS: + res.paths(igfs.listPaths(req.path())); - break; + break; - case LIST_FILES: - res.files(userIgfs.listFiles(req.path())); + case LIST_FILES: + res.files(igfs.listFiles(req.path())); - break; + break; - case SET_TIMES: - userIgfs.setTimes(req.path(), req.accessTime(), req.modificationTime()); + case SET_TIMES: + igfs.setTimes(req.path(), req.accessTime(), req.modificationTime()); - res.response(true); + res.response(true); - break; + break; - case AFFINITY: - res.locations(userIgfs.affinity(req.path(), req.start(), req.length())); + case AFFINITY: + res.locations(igfs.affinity(req.path(), req.start(), req.length())); - break; + break; - case OPEN_READ: { - IgfsInputStreamAdapter igfsIn = !req.flag() ? userIgfs.open(req.path(), bufSize) : - userIgfs.open(req.path(), bufSize, req.sequentialReadsBeforePrefetch()); + case OPEN_READ: { + IgfsInputStreamAdapter igfsIn = !req.flag() ? igfs.open(req.path(), bufSize) : + igfs.open(req.path(), bufSize, req.sequentialReadsBeforePrefetch()); - long streamId = registerResource(ses, igfsIn); + long streamId = registerResource(ses, igfsIn); - if (log.isDebugEnabled()) - log.debug("Opened IGFS input stream for file read [igfsName=" + userIgfs.name() + ", path=" + - req.path() + ", streamId=" + streamId + ", ses=" + ses + ']'); + if (log.isDebugEnabled()) + log.debug("Opened IGFS input stream for file read [igfsName=" + igfs.name() + ", path=" + + req.path() + ", streamId=" + streamId + ", ses=" + ses + ']'); - IgfsFileInfo info = new IgfsFileInfo(igfsIn.fileInfo(), null, - igfsIn.fileInfo().modificationTime()); + IgfsFileInfo info = new IgfsFileInfo(igfsIn.fileInfo(), null, + igfsIn.fileInfo().modificationTime()); - res.response(new IgfsInputStreamDescriptor(streamId, info.length())); + res.response(new IgfsInputStreamDescriptor(streamId, info.length())); - break; - } + break; + } - case OPEN_CREATE: { - long streamId = registerResource(ses, userIgfs.create( - req.path(), // Path. - bufSize, // Buffer size. - req.flag(), // Overwrite if exists. - affinityKey(req), // Affinity key based on replication factor. - req.replication(),// Replication factor. - req.blockSize(), // Block size. - req.properties() // File properties. - )); + case OPEN_CREATE: { + long streamId = registerResource(ses, igfs.create( + req.path(), // Path. + bufSize, // Buffer size. + req.flag(), // Overwrite if exists. + affinityKey(req), // Affinity key based on replication factor. + req.replication(),// Replication factor. + req.blockSize(), // Block size. + req.properties() // File properties. + )); - if (log.isDebugEnabled()) - log.debug("Opened IGFS output stream for file create [igfsName=" + userIgfs.name() + ", path=" + - req.path() + ", streamId=" + streamId + ", ses=" + ses + ']'); + if (log.isDebugEnabled()) + log.debug("Opened IGFS output stream for file create [igfsName=" + igfs.name() + ", path=" + + req.path() + ", streamId=" + streamId + ", ses=" + ses + ']'); - res.response(streamId); + res.response(streamId); - break; - } + break; + } - case OPEN_APPEND: { - long streamId = registerResource(ses, userIgfs.append( - req.path(), // Path. - bufSize, // Buffer size. - req.flag(), // Create if absent. - req.properties() // File properties. - )); + case OPEN_APPEND: { + long streamId = registerResource(ses, igfs.append( + req.path(), // Path. + bufSize, // Buffer size. + req.flag(), // Create if absent. + req.properties() // File properties. + )); - if (log.isDebugEnabled()) - log.debug("Opened IGFS output stream for file append [igfsName=" + userIgfs.name() + ", path=" + - req.path() + ", streamId=" + streamId + ", ses=" + ses + ']'); + if (log.isDebugEnabled()) + log.debug("Opened IGFS output stream for file append [igfsName=" + igfs.name() + ", path=" + + req.path() + ", streamId=" + streamId + ", ses=" + ses + ']'); - res.response(streamId); + res.response(streamId); - break; - } + break; + } - default: - assert false : "Unhandled path control request command: " + cmd; + default: + assert false : "Unhandled path control request command: " + cmd; - break; - } + break; + } + + return null; + } + }); } catch (IgniteException e) { - throw new IgniteCheckedException(e); + throw new IgniteCheckedException(e.getCause()); } if (log.isDebugEnabled()) - log.debug("Finished processing path control request [igfsName=" + userIgfs.name() + ", req=" + req + + log.debug("Finished processing path control request [igfsName=" + igfs.name() + ", req=" + req + ", res=" + res + ']'); return res; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7db58f9e/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java index 80f693e..a0927e2 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java @@ -76,18 +76,18 @@ public class IgniteHadoopFileSystemCounterWriter implements HadoopCounterWriter try { hadoopCfg.set(MRJobConfig.USER_NAME, user); - FileSystem fs = HadoopV2JobResourceManager.fileSystemForUser(jobStatPath.toUri(), hadoopCfg); + try (FileSystem fs = HadoopV2JobResourceManager.fileSystemForMrUser(jobStatPath.toUri(), hadoopCfg)) { + fs.mkdirs(jobStatPath); - fs.mkdirs(jobStatPath); + try (PrintStream out = new PrintStream(fs.create(new Path(jobStatPath, PERFORMANCE_COUNTER_FILE_NAME)))) { + for (T2<String, Long> evt : perfCntr.evts()) { + out.print(evt.get1()); + out.print(':'); + out.println(evt.get2().toString()); + } - try (PrintStream out = new PrintStream(fs.create(new Path(jobStatPath, PERFORMANCE_COUNTER_FILE_NAME)))) { - for (T2<String, Long> evt : perfCntr.evts()) { - out.print(evt.get1()); - out.print(':'); - out.println(evt.get2().toString()); + out.flush(); } - - out.flush(); } } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7db58f9e/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java index 132a2ee..c53aabf 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java @@ -22,7 +22,6 @@ import org.apache.hadoop.fs.*; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.permission.*; import org.apache.hadoop.hdfs.*; -import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.security.*; import org.apache.hadoop.util.*; import org.apache.ignite.*; @@ -173,21 +172,15 @@ public class IgniteHadoopFileSystem extends FileSystem { } /** - * Gets non-null and interned user name as per the Hadoop viewpoint. - * @param cfg the Hadoop job configuration, may be null. + * Gets non-null and interned user name as per the Hadoop file system viewpoint. * @return the user name, never null. */ - public static String getHadoopUser(@Nullable Configuration cfg) throws IOException { + public static String getFsHadoopUser() throws IOException { String user = null; - // TODO: Create ticket to remove these lines. -// // First, try to get the user from MR Job configuration: -// if (cfg != null) -// user = cfg.get(MRJobConfig.USER_NAME); - - UserGroupInformation currentUgi = UserGroupInformation.getCurrentUser(); - if (currentUgi != null) - user = currentUgi.getShortUserName(); + UserGroupInformation currUgi = UserGroupInformation.getCurrentUser(); + if (currUgi != null) + user = currUgi.getShortUserName(); user = IgfsUserContext.fixUserName(user); @@ -236,7 +229,7 @@ public class IgniteHadoopFileSystem extends FileSystem { uriAuthority = uri.getAuthority(); - user = getHadoopUser(cfg); + user = getFsHadoopUser(); // Override sequential reads before prefetch if needed. seqReadsBeforePrefetch = parameter(cfg, PARAM_IGFS_SEQ_READS_BEFORE_PREFETCH, uriAuthority, 0); @@ -321,7 +314,7 @@ public class IgniteHadoopFileSystem extends FileSystem { } } - // set working directory to the hone directory of the current Fs user: + // set working directory to the home directory of the current Fs user: setWorkingDirectory(null); } finally { @@ -866,25 +859,11 @@ public class IgniteHadoopFileSystem extends FileSystem { /** {@inheritDoc} */ @Override public Path getHomeDirectory() { - Path path = new Path("/user/" + user/*userName.get()*/); + Path path = new Path("/user/" + user); return path.makeQualified(getUri(), null); } -// /** -// * Set user name and default working directory for current thread. -// * -// * @param userName User name. -// */ -// @Deprecated // TODO: remove this method. -// public void setUser(String userName) { -// //System.out.println(this + ": ##### setting user = " + userName + ", thread = " + Thread.currentThread()); -// assert F.eq(user, userName); -// //this.userName.set(userName); -// -// //setWorkingDirectory(null); -// } - /** {@inheritDoc} */ @Override public void setWorkingDirectory(Path newPath) { if (newPath == null) { @@ -893,7 +872,7 @@ public class IgniteHadoopFileSystem extends FileSystem { if (secondaryFs != null) secondaryFs.setWorkingDirectory(toSecondary(homeDir)); - workingDir = homeDir; //.set(homeDir); + workingDir = homeDir; } else { Path fixedNewPath = fixRelativePart(newPath); @@ -906,13 +885,13 @@ public class IgniteHadoopFileSystem extends FileSystem { if (secondaryFs != null) secondaryFs.setWorkingDirectory(toSecondary(fixedNewPath)); - workingDir = fixedNewPath; //.set(fixedNewPath); + workingDir = fixedNewPath; } } /** {@inheritDoc} */ @Override public Path getWorkingDirectory() { - return workingDir; //.get(); + return workingDir; } /** {@inheritDoc} */ @@ -1282,12 +1261,4 @@ public class IgniteHadoopFileSystem extends FileSystem { public String user() { return user; } - - /** - * Getter for secondaryFs field. - * @return the secondary file system, if any. - */ - public FileSystem secondaryFs() { - return secondaryFs; - } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7db58f9e/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java index fd97ed6..8330143 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java @@ -144,7 +144,7 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea uri = name; - user = getHadoopUser(cfg); + user = getFsHadoopUser(); try { initialize(name, cfg); @@ -294,6 +294,7 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea SecondaryFileSystemProvider secProvider = new SecondaryFileSystemProvider(secUri, secConfPath); secondaryFs = secProvider.createAbstractFileSystem(user); + secondaryUri = secProvider.uri(); } catch (IOException e) { @@ -994,4 +995,4 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea public String user() { return user; } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7db58f9e/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java index 2b1d836..771388a 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java @@ -56,11 +56,9 @@ public class HadoopIgfsInProc implements HadoopIgfsEx { * @param log Log. */ public HadoopIgfsInProc(IgfsEx igfs, Log log, String userName) throws IgniteCheckedException { - this.user = userName; + this.user = IgfsUserContext.fixUserName(userName); - this.igfs = igfs; //.forUser(userName); - - //assert this.user == this.igfs.user(); + this.igfs = igfs; this.log = log; @@ -68,11 +66,24 @@ public class HadoopIgfsInProc implements HadoopIgfsEx { } /** {@inheritDoc} */ - @Override public IgfsHandshakeResponse handshake(String logDir) { - igfs.clientLogDirectory(logDir); + @Override public IgfsHandshakeResponse handshake(final String logDir) { + try { + return IgfsUserContext.doAs(user, new Callable<IgfsHandshakeResponse>() { + @Override public IgfsHandshakeResponse call() throws Exception { + igfs.clientLogDirectory(logDir); + + return new IgfsHandshakeResponse(igfs.name(), igfs.proxyPaths(), igfs.groupBlockSize(), + igfs.globalSampling()); + } + }); + } catch (IgniteException e) { + Throwable t = e.getCause(); - return new IgfsHandshakeResponse(igfs.name(), igfs.proxyPaths(), igfs.groupBlockSize(), - igfs.globalSampling()); + if (t instanceof RuntimeException) + throw (RuntimeException)t; + + throw e; + } } /** {@inheritDoc} */ @@ -90,12 +101,16 @@ public class HadoopIgfsInProc implements HadoopIgfsEx { } /** {@inheritDoc} */ - @Override public IgfsFile info(IgfsPath path) throws IgniteCheckedException { + @Override public IgfsFile info(final IgfsPath path) throws IgniteCheckedException { try { - return igfs.info(path); + return IgfsUserContext.doAs(user, new Callable<IgfsFile>() { + @Override public IgfsFile call() throws Exception { + return igfs.info(path); + } + }); } catch (IgniteException e) { - throw new IgniteCheckedException(e); + throw new IgniteCheckedException(e.getCause()); } catch (IllegalStateException e) { throw new HadoopIgfsCommunicationException("Failed to get file info because Grid is stopping: " + path); @@ -103,12 +118,16 @@ public class HadoopIgfsInProc implements HadoopIgfsEx { } /** {@inheritDoc} */ - @Override public IgfsFile update(IgfsPath path, Map<String, String> props) throws IgniteCheckedException { + @Override public IgfsFile update(final IgfsPath path, final Map<String, String> props) throws IgniteCheckedException { try { - return igfs.update(path, props); + return IgfsUserContext.doAs(user, new Callable<IgfsFile>() { + @Override public IgfsFile call() throws Exception { + return igfs.update(path, props); + } + }); } catch (IgniteException e) { - throw new IgniteCheckedException(e); + throw new IgniteCheckedException(e.getCause()); } catch (IllegalStateException e) { throw new HadoopIgfsCommunicationException("Failed to update file because Grid is stopping: " + path); @@ -116,14 +135,20 @@ public class HadoopIgfsInProc implements HadoopIgfsEx { } /** {@inheritDoc} */ - @Override public Boolean setTimes(IgfsPath path, long accessTime, long modificationTime) throws IgniteCheckedException { + @Override public Boolean setTimes(final IgfsPath path, final long accessTime, final long modificationTime) throws IgniteCheckedException { try { - igfs.setTimes(path, accessTime, modificationTime); + IgfsUserContext.doAs(user, new Callable<Void>() { + @Override public Void call() throws Exception { + igfs.setTimes(path, accessTime, modificationTime); + + return null; + } + }); return true; } catch (IgniteException e) { - throw new IgniteCheckedException(e); + throw new IgniteCheckedException(e.getCause()); } catch (IllegalStateException e) { throw new HadoopIgfsCommunicationException("Failed to set path times because Grid is stopping: " + @@ -132,14 +157,20 @@ public class HadoopIgfsInProc implements HadoopIgfsEx { } /** {@inheritDoc} */ - @Override public Boolean rename(IgfsPath src, IgfsPath dest) throws IgniteCheckedException { + @Override public Boolean rename(final IgfsPath src, final IgfsPath dest) throws IgniteCheckedException { try { - igfs.rename(src, dest); + IgfsUserContext.doAs(user, new Callable<Void>() { + @Override public Void call() throws Exception { + igfs.rename(src, dest); + + return null; + } + }); return true; } catch (IgniteException e) { - throw new IgniteCheckedException(e); + throw new IgniteCheckedException(e.getCause()); } catch (IllegalStateException e) { throw new HadoopIgfsCommunicationException("Failed to rename path because Grid is stopping: " + src); @@ -147,12 +178,16 @@ public class HadoopIgfsInProc implements HadoopIgfsEx { } /** {@inheritDoc} */ - @Override public Boolean delete(IgfsPath path, boolean recursive) throws IgniteCheckedException { + @Override public Boolean delete(final IgfsPath path, final boolean recursive) throws IgniteCheckedException { try { - return igfs.delete(path, recursive); + return IgfsUserContext.doAs(user, new Callable<Boolean>() { + @Override public Boolean call() throws Exception { + return igfs.delete(path, recursive); + } + }); } catch (IgniteException e) { - throw new IgniteCheckedException(e); + throw new IgniteCheckedException(e.getCause()); } catch (IllegalStateException e) { throw new HadoopIgfsCommunicationException("Failed to delete path because Grid is stopping: " + path); @@ -162,7 +197,14 @@ public class HadoopIgfsInProc implements HadoopIgfsEx { /** {@inheritDoc} */ @Override public IgfsStatus fsStatus() throws IgniteCheckedException { try { - return igfs.globalSpace(); + return IgfsUserContext.doAs(user, new Callable<IgfsStatus>() { + @Override public IgfsStatus call() throws Exception { + return igfs.globalSpace(); + } + }); + } + catch (IgniteException e) { + throw new IgniteCheckedException(e.getCause()); } catch (IllegalStateException e) { throw new HadoopIgfsCommunicationException("Failed to get file system status because Grid is " + @@ -171,12 +213,16 @@ public class HadoopIgfsInProc implements HadoopIgfsEx { } /** {@inheritDoc} */ - @Override public Collection<IgfsPath> listPaths(IgfsPath path) throws IgniteCheckedException { + @Override public Collection<IgfsPath> listPaths(final IgfsPath path) throws IgniteCheckedException { try { - return igfs.listPaths(path); + return IgfsUserContext.doAs(user, new Callable<Collection<IgfsPath>>() { + @Override public Collection<IgfsPath> call() throws Exception { + return igfs.listPaths(path); + } + }); } catch (IgniteException e) { - throw new IgniteCheckedException(e); + throw new IgniteCheckedException(e.getCause()); } catch (IllegalStateException e) { throw new HadoopIgfsCommunicationException("Failed to list paths because Grid is stopping: " + path); @@ -184,12 +230,16 @@ public class HadoopIgfsInProc implements HadoopIgfsEx { } /** {@inheritDoc} */ - @Override public Collection<IgfsFile> listFiles(IgfsPath path) throws IgniteCheckedException { + @Override public Collection<IgfsFile> listFiles(final IgfsPath path) throws IgniteCheckedException { try { - return igfs.listFiles(path); + return IgfsUserContext.doAs(user, new Callable<Collection<IgfsFile>>() { + @Override public Collection<IgfsFile> call() throws Exception { + return igfs.listFiles(path); + } + }); } catch (IgniteException e) { - throw new IgniteCheckedException(e); + throw new IgniteCheckedException(e.getCause()); } catch (IllegalStateException e) { throw new HadoopIgfsCommunicationException("Failed to list files because Grid is stopping: " + path); @@ -197,14 +247,20 @@ public class HadoopIgfsInProc implements HadoopIgfsEx { } /** {@inheritDoc} */ - @Override public Boolean mkdirs(IgfsPath path, Map<String, String> props) throws IgniteCheckedException { + @Override public Boolean mkdirs(final IgfsPath path, final Map<String, String> props) throws IgniteCheckedException { try { - igfs.mkdirs(path, props); + IgfsUserContext.doAs(user, new Callable<Void>() { + @Override public Void call() throws Exception { + igfs.mkdirs(path, props); + + return null; + } + }); return true; } catch (IgniteException e) { - throw new IgniteCheckedException(e); + throw new IgniteCheckedException(e.getCause()); } catch (IllegalStateException e) { throw new HadoopIgfsCommunicationException("Failed to create directory because Grid is stopping: " + @@ -213,12 +269,16 @@ public class HadoopIgfsInProc implements HadoopIgfsEx { } /** {@inheritDoc} */ - @Override public IgfsPathSummary contentSummary(IgfsPath path) throws IgniteCheckedException { + @Override public IgfsPathSummary contentSummary(final IgfsPath path) throws IgniteCheckedException { try { - return igfs.summary(path); + return IgfsUserContext.doAs(user, new Callable<IgfsPathSummary>() { + @Override public IgfsPathSummary call() throws Exception { + return igfs.summary(path); + } + }); } catch (IgniteException e) { - throw new IgniteCheckedException(e); + throw new IgniteCheckedException(e.getCause()); } catch (IllegalStateException e) { throw new HadoopIgfsCommunicationException("Failed to get content summary because Grid is stopping: " + @@ -227,13 +287,17 @@ public class HadoopIgfsInProc implements HadoopIgfsEx { } /** {@inheritDoc} */ - @Override public Collection<IgfsBlockLocation> affinity(IgfsPath path, long start, long len) + @Override public Collection<IgfsBlockLocation> affinity(final IgfsPath path, final long start, final long len) throws IgniteCheckedException { try { - return igfs.affinity(path, start, len); + return IgfsUserContext.doAs(user, new Callable<Collection<IgfsBlockLocation>>() { + @Override public Collection<IgfsBlockLocation> call() throws Exception { + return igfs.affinity(path, start, len); + } + }); } catch (IgniteException e) { - throw new IgniteCheckedException(e); + throw new IgniteCheckedException(e.getCause()); } catch (IllegalStateException e) { throw new HadoopIgfsCommunicationException("Failed to get affinity because Grid is stopping: " + path); @@ -241,14 +305,18 @@ public class HadoopIgfsInProc implements HadoopIgfsEx { } /** {@inheritDoc} */ - @Override public HadoopIgfsStreamDelegate open(IgfsPath path) throws IgniteCheckedException { + @Override public HadoopIgfsStreamDelegate open(final IgfsPath path) throws IgniteCheckedException { try { - IgfsInputStreamAdapter stream = igfs.open(path, bufSize); + return IgfsUserContext.doAs(user, new Callable<HadoopIgfsStreamDelegate>() { + @Override public HadoopIgfsStreamDelegate call() throws Exception { + IgfsInputStreamAdapter stream = igfs.open(path, bufSize); - return new HadoopIgfsStreamDelegate(this, stream, stream.fileInfo().length()); + return new HadoopIgfsStreamDelegate(HadoopIgfsInProc.this, stream, stream.fileInfo().length()); + } + }); } catch (IgniteException e) { - throw new IgniteCheckedException(e); + throw new IgniteCheckedException(e.getCause()); } catch (IllegalStateException e) { throw new HadoopIgfsCommunicationException("Failed to open file because Grid is stopping: " + path); @@ -256,15 +324,19 @@ public class HadoopIgfsInProc implements HadoopIgfsEx { } /** {@inheritDoc} */ - @Override public HadoopIgfsStreamDelegate open(IgfsPath path, int seqReadsBeforePrefetch) + @Override public HadoopIgfsStreamDelegate open(final IgfsPath path, final int seqReadsBeforePrefetch) throws IgniteCheckedException { try { - IgfsInputStreamAdapter stream = igfs.open(path, bufSize, seqReadsBeforePrefetch); + return IgfsUserContext.doAs(user, new Callable<HadoopIgfsStreamDelegate>() { + @Override public HadoopIgfsStreamDelegate call() throws Exception { + IgfsInputStreamAdapter stream = igfs.open(path, bufSize, seqReadsBeforePrefetch); - return new HadoopIgfsStreamDelegate(this, stream, stream.fileInfo().length()); + return new HadoopIgfsStreamDelegate(HadoopIgfsInProc.this, stream, stream.fileInfo().length()); + } + }); } catch (IgniteException e) { - throw new IgniteCheckedException(e); + throw new IgniteCheckedException(e.getCause()); } catch (IllegalStateException e) { throw new HadoopIgfsCommunicationException("Failed to open file because Grid is stopping: " + path); @@ -272,16 +344,20 @@ public class HadoopIgfsInProc implements HadoopIgfsEx { } /** {@inheritDoc} */ - @Override public HadoopIgfsStreamDelegate create(IgfsPath path, boolean overwrite, boolean colocate, - int replication, long blockSize, @Nullable Map<String, String> props) throws IgniteCheckedException { + @Override public HadoopIgfsStreamDelegate create(final IgfsPath path, final boolean overwrite, final boolean colocate, + final int replication, final long blockSize, final @Nullable Map<String, String> props) throws IgniteCheckedException { try { - IgfsOutputStream stream = igfs.create(path, bufSize, overwrite, - colocate ? igfs.nextAffinityKey() : null, replication, blockSize, props); + return IgfsUserContext.doAs(user, new Callable<HadoopIgfsStreamDelegate>() { + @Override public HadoopIgfsStreamDelegate call() throws Exception { + IgfsOutputStream stream = igfs.create(path, bufSize, overwrite, + colocate ? igfs.nextAffinityKey() : null, replication, blockSize, props); - return new HadoopIgfsStreamDelegate(this, stream); + return new HadoopIgfsStreamDelegate(HadoopIgfsInProc.this, stream); + } + }); } catch (IgniteException e) { - throw new IgniteCheckedException(e); + throw new IgniteCheckedException(e.getCause()); } catch (IllegalStateException e) { throw new HadoopIgfsCommunicationException("Failed to create file because Grid is stopping: " + path); @@ -289,15 +365,19 @@ public class HadoopIgfsInProc implements HadoopIgfsEx { } /** {@inheritDoc} */ - @Override public HadoopIgfsStreamDelegate append(IgfsPath path, boolean create, - @Nullable Map<String, String> props) throws IgniteCheckedException { + @Override public HadoopIgfsStreamDelegate append(final IgfsPath path, final boolean create, + final @Nullable Map<String, String> props) throws IgniteCheckedException { try { - IgfsOutputStream stream = igfs.append(path, bufSize, create, props); + return IgfsUserContext.doAs(user, new Callable<HadoopIgfsStreamDelegate>() { + @Override public HadoopIgfsStreamDelegate call() throws Exception { + IgfsOutputStream stream = igfs.append(path, bufSize, create, props); - return new HadoopIgfsStreamDelegate(this, stream); + return new HadoopIgfsStreamDelegate(HadoopIgfsInProc.this, stream); + } + }); } catch (IgniteException e) { - throw new IgniteCheckedException(e); + throw new IgniteCheckedException(e.getCause()); } catch (IllegalStateException e) { throw new HadoopIgfsCommunicationException("Failed to append file because Grid is stopping: " + path); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7db58f9e/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java index 061eed7..639f2eb 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java @@ -141,7 +141,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener this.grid = grid; this.igfs = igfs; this.log = log; - this.userName = user; + this.userName = IgfsUserContext.fixUserName(user); io = HadoopIgfsIpcIo.get(log, endpoint); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7db58f9e/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java index 1574152..eaf7392 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java @@ -55,6 +55,7 @@ public class HadoopIgfsWrapper implements HadoopIgfs { /** Logger. */ private final Log log; + /** The user name this wrapper works on behalf of. */ private final String userName; /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7db58f9e/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java index 9d7125a..fe350b2 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java @@ -21,7 +21,6 @@ import org.apache.hadoop.conf.*; import org.apache.hadoop.fs.*; import org.apache.hadoop.security.*; import org.apache.ignite.*; -import org.apache.ignite.configuration.*; import org.apache.ignite.internal.processors.hadoop.*; import org.apache.ignite.internal.processors.hadoop.counter.*; import org.apache.ignite.internal.processors.hadoop.shuffle.collections.*; @@ -111,8 +110,17 @@ public abstract class HadoopRunnableTask implements Callable<Void> { assert !F.isEmpty(user); - if (F.eq(user, FileSystemConfiguration.DFLT_USER_NAME)) - // do direct call: + String ugiUser; + try { + UserGroupInformation currUser = UserGroupInformation.getCurrentUser(); + + ugiUser = currUser.getShortUserName(); + } catch (IOException ioe) { + throw new IgniteCheckedException(ioe); + } + + if (F.eq(user, ugiUser)) + // if current UGI context user is the same, do direct call: return callImpl(); else { // do the call in the context of 'user': @@ -141,7 +149,7 @@ public abstract class HadoopRunnableTask implements Callable<Void> { /** * Runnable task call implementation - * @return + * @return null. * @throws IgniteCheckedException */ Void callImpl() throws IgniteCheckedException { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7db58f9e/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java index ac10687..0d40f34 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java @@ -140,7 +140,7 @@ public class HadoopV2Job implements HadoopJob { Path jobDir = new Path(jobDirPath); - try (FileSystem fs = fileSystemForUser(jobDir.toUri(), jobConf)) { + try (FileSystem fs = fileSystemForMrUser(jobDir.toUri(), jobConf)) { JobSplit.TaskSplitMetaInfo[] metaInfos = SplitMetaInfoReader.readSplitMetaInfo(hadoopJobID, fs, jobConf, jobDir); @@ -264,8 +264,6 @@ public class HadoopV2Job implements HadoopJob { if (jobLocDir.exists()) U.delete(jobLocDir); } -// -// disposeFileSystem(); } finally { taskCtxClsPool.clear(); @@ -300,25 +298,6 @@ public class HadoopV2Job implements HadoopJob { } } -// /** -// * Closes the underlying file system. -// * @throws IgniteCheckedException on error. -// */ -// private void disposeFileSystem() throws IgniteCheckedException { -// FileSystem fs0 = fs; -// -// try { -// if (fs0 != null) -// fs0.close(); -// } -// catch (IOException ioe) { -// throw new IgniteCheckedException(ioe); -// } -// finally { -// fs = null; -// } -// } - /** {@inheritDoc} */ @Override public void prepareTaskEnvironment(HadoopTaskInfo info) throws IgniteCheckedException { rsrcMgr.prepareTaskWorkDir(taskLocalDir(locNodeId, info)); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7db58f9e/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java index 01d4719..340891a 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java @@ -59,8 +59,6 @@ public class HadoopV2JobResourceManager { /** Staging directory to delivery job jar and config to the work nodes. */ private Path stagingDir; -// -// private FileSystem fs; /** * Creates new instance. @@ -72,10 +70,6 @@ public class HadoopV2JobResourceManager { this.jobId = jobId; this.ctx = ctx; this.log = log.getLogger(HadoopV2JobResourceManager.class); -// -// assert fs != null; -// -// this.fs = fs; } /** @@ -101,6 +95,22 @@ public class HadoopV2JobResourceManager { } /** + * Gets non-null and interned user name as per the Hadoop viewpoint. + * @param cfg the Hadoop job configuration, may be null. + * @return the user name, never null. + */ + private static String getMrHadoopUser(Configuration cfg) throws IOException { + String user = cfg.get(MRJobConfig.USER_NAME); + + if (user == null) + user = IgniteHadoopFileSystem.getFsHadoopUser(); + + user = user.intern(); + + return user; + } + + /** * Common method to get the V1 file system in MapRed engine. * It creates the filesystem for the user specified in the * configuration with {@link MRJobConfig#USER_NAME} property. @@ -108,10 +118,9 @@ public class HadoopV2JobResourceManager { * @param cfg the configuration. * @return the file system * @throws IOException - * @throws InterruptedException */ - public static FileSystem fileSystemForUser(@Nullable URI uri, @Nullable Configuration cfg) throws IOException { - final String user = IgniteHadoopFileSystem.getHadoopUser(cfg); + public static FileSystem fileSystemForMrUser(@Nullable URI uri, Configuration cfg) throws IOException { + final String user = getMrHadoopUser(cfg); assert user != null; @@ -154,7 +163,7 @@ public class HadoopV2JobResourceManager { stagingDir = new Path(new URI(mrDir)); if (download) { - try (FileSystem fs = fileSystemForUser(stagingDir.toUri(), cfg)) { + try (FileSystem fs = fileSystemForMrUser(stagingDir.toUri(), cfg)) { if (!fs.exists(stagingDir)) throw new IgniteCheckedException("Failed to find map-reduce submission directory (does not exist): " + stagingDir); @@ -246,7 +255,7 @@ public class HadoopV2JobResourceManager { FileSystem dstFs = FileSystem.getLocal(cfg); - try (FileSystem srcFs = fileSystemForUser(srcPath.toUri(), cfg)) { + try (FileSystem srcFs = fileSystemForMrUser(srcPath.toUri(), cfg)) { if (extract) { File archivesPath = new File(jobLocDir.getAbsolutePath(), ".cached-archives"); @@ -329,7 +338,7 @@ public class HadoopV2JobResourceManager { public void cleanupStagingDirectory() { try { if (stagingDir != null) { - try (FileSystem fs = fileSystemForUser(stagingDir.toUri(), ctx.getJobConf())) { + try (FileSystem fs = fileSystemForMrUser(stagingDir.toUri(), ctx.getJobConf())) { fs.delete(stagingDir, true); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7db58f9e/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java index de7ff7f..d19f4f4 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java @@ -32,7 +32,6 @@ import org.apache.ignite.*; import org.apache.ignite.internal.processors.hadoop.*; import org.apache.ignite.internal.processors.hadoop.counter.*; import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters; -import org.apache.ignite.internal.processors.hadoop.fs.*; import org.apache.ignite.internal.processors.hadoop.v1.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.jetbrains.annotations.*; @@ -233,11 +232,6 @@ public class HadoopV2TaskContext extends HadoopTaskContext { Thread.currentThread().setContextClassLoader(jobConf().getClassLoader()); try { - //FileSystem fs = HadoopV2JobResourceManager.fileSystemForUser(null, jobConf()); - //String user = jobConf().getUser(); - //System.out.println("Setting user ["+user+"] to fs=" + fs + ", thread = " + Thread.currentThread()); - //HadoopFileSystemsUtils.setUser(fs, user); // - LocalFileSystem locFs = FileSystem.getLocal(jobConf()); locFs.setWorkingDirectory(new Path(locDir.getAbsolutePath())); @@ -413,7 +407,7 @@ public class HadoopV2TaskContext extends HadoopTaskContext { private Object readExternalSplit(HadoopExternalSplit split) throws IgniteCheckedException { Path jobDir = new Path(jobConf().get(MRJobConfig.MAPREDUCE_JOB_DIR)); - try (FileSystem fs = HadoopV2JobResourceManager.fileSystemForUser(jobDir.toUri(), jobConf()); + try (FileSystem fs = HadoopV2JobResourceManager.fileSystemForMrUser(jobDir.toUri(), jobConf()); FSDataInputStream in = fs.open(JobSubmissionFiles.getJobSplitFile(jobDir))) { in.seek(split.offset()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7db58f9e/parent/pom.xml ---------------------------------------------------------------------- diff --git a/parent/pom.xml b/parent/pom.xml index 63fbf48..cb84f7f 100644 --- a/parent/pom.xml +++ b/parent/pom.xml @@ -35,7 +35,7 @@ <properties> <ignite.edition>fabric</ignite.edition> - <hadoop.version>2.6.0</hadoop.version> // TODO: Revert. + <hadoop.version>2.6.0</hadoop.version> <spring.version>4.1.0.RELEASE</spring.version> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.build.timestamp.format>MMMM d yyyy</maven.build.timestamp.format>