[IGNITE-958]: IGNITE-218 (Wrong staging permissions while running MR job under hadoop accelerator): IGFS part.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/35388195 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/35388195 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/35388195 Branch: refs/heads/ignite-389 Commit: 353881951fcdcc16c3dc31d808d3af6c263f74ce Parents: 7ec4c82 Author: iveselovskiy <iveselovs...@gridgain.com> Authored: Fri May 29 15:31:35 2015 +0300 Committer: iveselovskiy <iveselovs...@gridgain.com> Committed: Fri May 29 15:31:35 2015 +0300 ---------------------------------------------------------------------- .../igfs/secondary/IgfsSecondaryFileSystem.java | 7 + .../internal/igfs/common/IgfsMarshaller.java | 35 +--- .../igfs/common/IgfsPathControlRequest.java | 22 +++ .../internal/processors/hadoop/HadoopJob.java | 2 +- .../ignite/internal/processors/igfs/IgfsEx.java | 8 +- .../internal/processors/igfs/IgfsImpl.java | 8 +- .../processors/igfs/IgfsIpcHandler.java | 184 ++++++++++--------- .../igfs/IgfsSecondaryFileSystemImpl.java | 9 +- .../internal/processors/igfs/IgfsServer.java | 4 +- .../internal/processors/igfs/IgfsUtils.java | 16 ++ .../fs/IgniteHadoopIgfsSecondaryFileSystem.java | 165 ++++++++++++----- .../hadoop/fs/v1/IgniteHadoopFileSystem.java | 107 +++++++---- .../hadoop/fs/v2/IgniteHadoopFileSystem.java | 32 +++- .../internal/processors/hadoop/HadoopUtils.java | 10 +- .../hadoop/SecondaryFileSystemProvider.java | 53 +++--- .../hadoop/fs/HadoopDistributedFileSystem.java | 91 --------- .../hadoop/fs/HadoopFileSystemsUtils.java | 17 -- .../processors/hadoop/igfs/HadoopIgfsEx.java | 6 + .../hadoop/igfs/HadoopIgfsInProc.java | 170 ++++++++++++----- .../processors/hadoop/igfs/HadoopIgfsIpcIo.java | 2 +- .../hadoop/igfs/HadoopIgfsOutProc.java | 33 +++- .../hadoop/igfs/HadoopIgfsWrapper.java | 19 +- .../hadoop/v2/HadoopV2TaskContext.java | 4 +- .../HadoopIgfs20FileSystemAbstractSelfTest.java | 56 ++++-- ...oopSecondaryFileSystemConfigurationTest.java | 4 +- .../IgniteHadoopFileSystemAbstractSelfTest.java | 63 +++++-- .../IgniteHadoopFileSystemClientSelfTest.java | 2 +- .../IgniteHadoopFileSystemIpcCacheSelfTest.java | 2 + .../hadoop/HadoopFileSystemsTest.java | 23 +-- .../collections/HadoopSkipListSelfTest.java | 4 +- 30 files changed, 684 insertions(+), 474 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java b/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java index 9026eac..cb69352 100644 --- a/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java +++ b/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java @@ -198,4 +198,11 @@ public interface IgfsSecondaryFileSystem { * @return Map of properties. */ public Map<String,String> properties(); + + + /** + * Closes the secondary file system. + * @throws IgniteException in case of an error. + */ + public void close() throws IgniteException; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsMarshaller.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsMarshaller.java b/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsMarshaller.java index 11af716..6a6f22a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsMarshaller.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsMarshaller.java @@ -73,6 +73,7 @@ public class IgfsMarshaller { } /** + * Serializes the message and sends it into the given output stream. * @param msg Message. * @param hdr Message header. * @param out Output. @@ -119,6 +120,7 @@ public class IgfsMarshaller { IgfsPathControlRequest req = (IgfsPathControlRequest)msg; + U.writeString(out, req.userName()); writePath(out, req.path()); writePath(out, req.destinationPath()); out.writeBoolean(req.flag()); @@ -236,6 +238,7 @@ public class IgfsMarshaller { case OPEN_CREATE: { IgfsPathControlRequest req = new IgfsPathControlRequest(); + req.userName(U.readString(in)); req.path(readPath(in)); req.destinationPath(readPath(in)); req.flag(in.readBoolean()); @@ -298,8 +301,6 @@ public class IgfsMarshaller { } } - assert msg != null; - msg.command(cmd); return msg; @@ -341,34 +342,4 @@ public class IgfsMarshaller { return null; } - - /** - * Writes string to output. - * - * @param out Data output. - * @param str String. - * @throws IOException If write failed. - */ - private void writeString(DataOutput out, @Nullable String str) throws IOException { - out.writeBoolean(str != null); - - if (str != null) - out.writeUTF(str); - } - - /** - * Reads string from input. - * - * @param in Data input. - * @return Read string. - * @throws IOException If read failed. - */ - @Nullable private String readString(DataInput in) throws IOException { - boolean hasStr = in.readBoolean(); - - if (hasStr) - return in.readUTF(); - - return null; - } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsPathControlRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsPathControlRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsPathControlRequest.java index 7ed1619..2f6e6e5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsPathControlRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsPathControlRequest.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.igfs.common; import org.apache.ignite.igfs.*; +import org.apache.ignite.internal.processors.igfs.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.jetbrains.annotations.*; @@ -63,6 +64,9 @@ public class IgfsPathControlRequest extends IgfsMessage { /** Last modification time. */ private long modificationTime; + /** The user name this control request is made on behalf of. */ + private String userName; + /** * @param path Path. */ @@ -235,4 +239,22 @@ public class IgfsPathControlRequest extends IgfsMessage { @Override public String toString() { return S.toString(IgfsPathControlRequest.class, this, "cmd", command()); } + + /** + * Getter for the user name. + * @return user name. + */ + public final String userName() { + assert userName != null; + + return userName; + } + + /** + * Setter for the user name. + * @param userName the user name. + */ + public final void userName(String userName) { + this.userName = IgfsUtils.fixUserName(userName); + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJob.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJob.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJob.java index 65cb48d..5fd6c81 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJob.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJob.java @@ -98,5 +98,5 @@ public interface HadoopJob { /** * Cleans up the job staging directory. */ - void cleanupStagingDirectory(); + public void cleanupStagingDirectory(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java index 7c1a837..361f75f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java @@ -48,8 +48,12 @@ public interface IgfsEx extends IgniteFileSystem { /** Property name for URI of file system. */ public static final String SECONDARY_FS_URI = "SECONDARY_FS_URI"; - /** Property name for user name of file system. */ - public static final String SECONDARY_FS_USER_NAME = "SECONDARY_FS_USER_NAME"; + /** Property name for default user name of file system. + * NOTE: for secondary file system this is just a default user name, which is used + * when the 2ndary filesystem is used outside of any user context. + * If another user name is set in the context, 2ndary file system will work on behalf + * of that user, which is different from the default. */ + public static final String SECONDARY_FS_USER_NAME = "SECONDARY_FS_USER_NAME"; /** * Stops IGFS cleaning all used resources. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java index 34636d2..c3495e9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java @@ -245,8 +245,12 @@ public final class IgfsImpl implements IgfsEx { for (IgfsFileWorkerBatch batch : workerMap.values()) batch.cancel(); - if (secondaryFs instanceof AutoCloseable) - U.closeQuiet((AutoCloseable)secondaryFs); + try { + secondaryFs.close(); + } + catch (Exception e) { + log.error("Failed to close secondary file system.", e); + } } igfsCtx.kernalContext().io().removeMessageListener(topic, delMsgLsnr); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java index 8a8b858..cfe6ed4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java @@ -51,10 +51,10 @@ class IgfsIpcHandler implements IgfsServerHandler { private final int bufSize; // Buffer size. Must not be less then file block size. /** IGFS instance for this handler. */ - private IgfsEx igfs; + private final IgfsEx igfs; /** Resource ID generator. */ - private AtomicLong rsrcIdGen = new AtomicLong(); + private final AtomicLong rsrcIdGen = new AtomicLong(); /** Stopping flag. */ private volatile boolean stopping; @@ -241,138 +241,148 @@ class IgfsIpcHandler implements IgfsServerHandler { * @return Response message. * @throws IgniteCheckedException If failed. */ - private IgfsMessage processPathControlRequest(IgfsClientSession ses, IgfsIpcCommand cmd, + private IgfsMessage processPathControlRequest(final IgfsClientSession ses, final IgfsIpcCommand cmd, IgfsMessage msg) throws IgniteCheckedException { - IgfsPathControlRequest req = (IgfsPathControlRequest)msg; + final IgfsPathControlRequest req = (IgfsPathControlRequest)msg; if (log.isDebugEnabled()) log.debug("Processing path control request [igfsName=" + igfs.name() + ", req=" + req + ']'); - IgfsControlResponse res = new IgfsControlResponse(); + final IgfsControlResponse res = new IgfsControlResponse(); + + final String userName = req.userName(); + + assert userName != null; try { - switch (cmd) { - case EXISTS: - res.response(igfs.exists(req.path())); + IgfsUserContext.doAs(userName, new IgniteOutClosure<Object>() { + @Override public Void apply() { + switch (cmd) { + case EXISTS: + res.response(igfs.exists(req.path())); - break; + break; - case INFO: - res.response(igfs.info(req.path())); + case INFO: + res.response(igfs.info(req.path())); - break; + break; - case PATH_SUMMARY: - res.response(igfs.summary(req.path())); + case PATH_SUMMARY: + res.response(igfs.summary(req.path())); - break; + break; - case UPDATE: - res.response(igfs.update(req.path(), req.properties())); + case UPDATE: + res.response(igfs.update(req.path(), req.properties())); - break; + break; - case RENAME: - igfs.rename(req.path(), req.destinationPath()); + case RENAME: + igfs.rename(req.path(), req.destinationPath()); - res.response(true); + res.response(true); - break; + break; - case DELETE: - res.response(igfs.delete(req.path(), req.flag())); + case DELETE: + res.response(igfs.delete(req.path(), req.flag())); - break; + break; - case MAKE_DIRECTORIES: - igfs.mkdirs(req.path(), req.properties()); + case MAKE_DIRECTORIES: + igfs.mkdirs(req.path(), req.properties()); - res.response(true); + res.response(true); - break; + break; - case LIST_PATHS: - res.paths(igfs.listPaths(req.path())); + case LIST_PATHS: + res.paths(igfs.listPaths(req.path())); - break; + break; - case LIST_FILES: - res.files(igfs.listFiles(req.path())); + case LIST_FILES: + res.files(igfs.listFiles(req.path())); - break; + break; - case SET_TIMES: - igfs.setTimes(req.path(), req.accessTime(), req.modificationTime()); + case SET_TIMES: + igfs.setTimes(req.path(), req.accessTime(), req.modificationTime()); - res.response(true); + res.response(true); - break; + break; - case AFFINITY: - res.locations(igfs.affinity(req.path(), req.start(), req.length())); + case AFFINITY: + res.locations(igfs.affinity(req.path(), req.start(), req.length())); - break; + break; - case OPEN_READ: { - IgfsInputStreamAdapter igfsIn = !req.flag() ? igfs.open(req.path(), bufSize) : - igfs.open(req.path(), bufSize, req.sequentialReadsBeforePrefetch()); + case OPEN_READ: { + IgfsInputStreamAdapter igfsIn = !req.flag() ? igfs.open(req.path(), bufSize) : + igfs.open(req.path(), bufSize, req.sequentialReadsBeforePrefetch()); - long streamId = registerResource(ses, igfsIn); + long streamId = registerResource(ses, igfsIn); - if (log.isDebugEnabled()) - log.debug("Opened IGFS input stream for file read [igfsName=" + igfs.name() + ", path=" + - req.path() + ", streamId=" + streamId + ", ses=" + ses + ']'); + if (log.isDebugEnabled()) + log.debug("Opened IGFS input stream for file read [igfsName=" + igfs.name() + ", path=" + + req.path() + ", streamId=" + streamId + ", ses=" + ses + ']'); - IgfsFileInfo info = new IgfsFileInfo(igfsIn.fileInfo(), null, - igfsIn.fileInfo().modificationTime()); + IgfsFileInfo info = new IgfsFileInfo(igfsIn.fileInfo(), null, + igfsIn.fileInfo().modificationTime()); - res.response(new IgfsInputStreamDescriptor(streamId, info.length())); + res.response(new IgfsInputStreamDescriptor(streamId, info.length())); - break; - } + break; + } - case OPEN_CREATE: { - long streamId = registerResource(ses, igfs.create( - req.path(), // Path. - bufSize, // Buffer size. - req.flag(), // Overwrite if exists. - affinityKey(req), // Affinity key based on replication factor. - req.replication(),// Replication factor. - req.blockSize(), // Block size. - req.properties() // File properties. - )); + case OPEN_CREATE: { + long streamId = registerResource(ses, igfs.create( + req.path(), // Path. + bufSize, // Buffer size. + req.flag(), // Overwrite if exists. + affinityKey(req), // Affinity key based on replication factor. + req.replication(),// Replication factor. + req.blockSize(), // Block size. + req.properties() // File properties. + )); - if (log.isDebugEnabled()) - log.debug("Opened IGFS output stream for file create [igfsName=" + igfs.name() + ", path=" + - req.path() + ", streamId=" + streamId + ", ses=" + ses + ']'); + if (log.isDebugEnabled()) + log.debug("Opened IGFS output stream for file create [igfsName=" + igfs.name() + ", path=" + + req.path() + ", streamId=" + streamId + ", ses=" + ses + ']'); - res.response(streamId); + res.response(streamId); - break; - } + break; + } - case OPEN_APPEND: { - long streamId = registerResource(ses, igfs.append( - req.path(), // Path. - bufSize, // Buffer size. - req.flag(), // Create if absent. - req.properties() // File properties. - )); + case OPEN_APPEND: { + long streamId = registerResource(ses, igfs.append( + req.path(), // Path. + bufSize, // Buffer size. + req.flag(), // Create if absent. + req.properties() // File properties. + )); - if (log.isDebugEnabled()) - log.debug("Opened IGFS output stream for file append [igfsName=" + igfs.name() + ", path=" + - req.path() + ", streamId=" + streamId + ", ses=" + ses + ']'); + if (log.isDebugEnabled()) + log.debug("Opened IGFS output stream for file append [igfsName=" + igfs.name() + ", path=" + + req.path() + ", streamId=" + streamId + ", ses=" + ses + ']'); - res.response(streamId); + res.response(streamId); - break; - } + break; + } - default: - assert false : "Unhandled path control request command: " + cmd; + default: + assert false : "Unhandled path control request command: " + cmd; - break; - } + break; + } + + return null; + } + }); } catch (IgniteException e) { throw new IgniteCheckedException(e); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java index 683b317..b8095b8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java @@ -30,14 +30,14 @@ import java.util.*; */ class IgfsSecondaryFileSystemImpl implements IgfsSecondaryFileSystem { /** Delegate. */ - private final IgfsImpl igfs; + private final IgfsEx igfs; /** * Constructor. * * @param igfs Delegate. */ - IgfsSecondaryFileSystemImpl(IgfsImpl igfs) { + IgfsSecondaryFileSystemImpl(IgfsEx igfs) { this.igfs = igfs; } @@ -118,4 +118,9 @@ class IgfsSecondaryFileSystemImpl implements IgfsSecondaryFileSystem { @Override public Map<String, String> properties() { return Collections.emptyMap(); } + + /** {@inheritDoc} */ + @Override public void close() throws IgniteException { + igfs.stop(true); + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServer.java index 253d5be..caa6866 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServer.java @@ -239,13 +239,13 @@ public class IgfsServer { */ private class ClientWorker extends GridWorker { /** Connected client endpoint. */ - private IpcEndpoint endpoint; + private final IpcEndpoint endpoint; /** Data output stream. */ private final IgfsDataOutputStream out; /** Client session object. */ - private IgfsClientSession ses; + private final IgfsClientSession ses; /** Queue node for fast unlink. */ private ConcurrentLinkedDeque8.Node<ClientWorker> node; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java index 4b0234f..8026a44 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java @@ -18,9 +18,11 @@ package org.apache.ignite.internal.processors.igfs; import org.apache.ignite.*; +import org.apache.ignite.configuration.*; import org.apache.ignite.igfs.*; import org.apache.ignite.internal.cluster.*; import org.apache.ignite.internal.util.typedef.*; +import org.jetbrains.annotations.*; import java.lang.reflect.*; @@ -88,4 +90,18 @@ public class IgfsUtils { private IgfsUtils() { // No-op. } + + /** + * Provides non-null user name. + * If the user name is null or empty string, defaults to {@link FileSystemConfiguration#DFLT_USER_NAME}, + * which is the current process owner user. + * @param user a user name to be fixed. + * @return non-null interned user name. + */ + public static String fixUserName(@Nullable String user) { + if (F.isEmpty(user)) + user = FileSystemConfiguration.DFLT_USER_NAME; + + return user; + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java index ba891f8..6a630fb 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java @@ -20,15 +20,16 @@ package org.apache.ignite.hadoop.fs; import org.apache.hadoop.fs.*; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.permission.*; -import org.apache.hadoop.ipc.*; import org.apache.ignite.*; import org.apache.ignite.igfs.*; import org.apache.ignite.igfs.secondary.*; import org.apache.ignite.internal.processors.hadoop.*; +import org.apache.ignite.internal.processors.hadoop.fs.*; import org.apache.ignite.internal.processors.hadoop.igfs.*; import org.apache.ignite.internal.processors.igfs.*; import org.apache.ignite.internal.util.typedef.*; import org.jetbrains.annotations.*; +import org.apache.ignite.internal.processors.hadoop.fs.HadoopLazyConcurrentMap.*; import java.io.*; import java.net.*; @@ -37,15 +38,45 @@ import java.util.*; import static org.apache.ignite.internal.processors.igfs.IgfsEx.*; /** - * Adapter to use any Hadoop file system {@link FileSystem} as {@link IgfsSecondaryFileSystem}. + * Adapter to use any Hadoop file system {@link FileSystem} as {@link IgfsSecondaryFileSystem}. + * In fact, this class deals with different FileSystems depending on the user context, + * see {@link IgfsUserContext#currentUser()}. */ -public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSystem, AutoCloseable { - /** Hadoop file system. */ - private final FileSystem fileSys; - - /** Properties of file system */ +public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSystem { + /** Properties of file system, see {@link #properties()} + * + * See {@link IgfsEx#SECONDARY_FS_CONFIG_PATH} + * See {@link IgfsEx#SECONDARY_FS_URI} + * See {@link IgfsEx#SECONDARY_FS_USER_NAME} + * */ private final Map<String, String> props = new HashMap<>(); + /** Secondary file system provider. */ + private final SecondaryFileSystemProvider secProvider; + + /** The default user name. It is used if no user context is set. */ + private final String dfltUserName; + + /** FileSystem instance created for the default user. + * Stored outside the fileSysLazyMap due to performance reasons. */ + private final FileSystem dfltFs; + + /** Lazy per-user cache for the file systems. It is cleared and nulled in #close() method. */ + private final HadoopLazyConcurrentMap<String, FileSystem> fileSysLazyMap = new HadoopLazyConcurrentMap<>( + new ValueFactory<String, FileSystem>() { + @Override public FileSystem createValue(String key) { + try { + assert !F.isEmpty(key); + + return secProvider.createFileSystem(key); + } + catch (IOException ioe) { + throw new IgniteException(ioe); + } + } + } + ); + /** * Simple constructor that is to be used by default. * @@ -77,8 +108,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys * @throws IgniteCheckedException In case of error. */ public IgniteHadoopIgfsSecondaryFileSystem(@Nullable String uri, @Nullable String cfgPath, - @Nullable String userName) - throws IgniteCheckedException { + @Nullable String userName) throws IgniteCheckedException { // Treat empty uri and userName arguments as nulls to improve configuration usability: if (F.isEmpty(uri)) uri = null; @@ -89,27 +119,31 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys if (F.isEmpty(userName)) userName = null; + this.dfltUserName = IgfsUtils.fixUserName(userName); + try { - SecondaryFileSystemProvider secProvider = new SecondaryFileSystemProvider(uri, cfgPath, userName); + this.secProvider = new SecondaryFileSystemProvider(uri, cfgPath); - fileSys = secProvider.createFileSystem(); + // File system creation for the default user name. + // The value is *not* stored in the 'fileSysLazyMap' cache, but saved in field: + this.dfltFs = secProvider.createFileSystem(dfltUserName); + } + catch (IOException e) { + throw new IgniteCheckedException(e); + } - uri = secProvider.uri().toString(); + assert dfltFs != null; - if (!uri.endsWith("/")) - uri += "/"; + uri = secProvider.uri().toString(); - if (cfgPath != null) - props.put(SECONDARY_FS_CONFIG_PATH, cfgPath); + if (!uri.endsWith("/")) + uri += "/"; - if (userName != null) - props.put(SECONDARY_FS_USER_NAME, userName); + if (cfgPath != null) + props.put(SECONDARY_FS_CONFIG_PATH, cfgPath); - props.put(SECONDARY_FS_URI, uri); - } - catch (IOException e) { - throw new IgniteCheckedException(e); - } + props.put(SECONDARY_FS_URI, uri); + props.put(SECONDARY_FS_USER_NAME, dfltUserName); } /** @@ -119,7 +153,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys * @return Hadoop path. */ private Path convert(IgfsPath path) { - URI uri = fileSys.getUri(); + URI uri = fileSysForUser().getUri(); return new Path(uri.getScheme(), uri.getAuthority(), path.toString()); } @@ -131,14 +165,9 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys * @param detailMsg Detailed error message. * @return Appropriate exception. */ - @SuppressWarnings({"ThrowableResultOfMethodCallIgnored", "unchecked"}) private IgfsException handleSecondaryFsError(IOException e, String detailMsg) { - boolean wrongVer = X.hasCause(e, RemoteException.class) || - (e.getMessage() != null && e.getMessage().contains("Failed on local")); - - return !wrongVer ? cast(detailMsg, e) : - new IgfsInvalidHdfsVersionException("HDFS version you are connecting to differs from local " + - "version.", e); } + return cast(detailMsg, e); + } /** * Cast IO exception to IGFS exception. @@ -178,7 +207,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys /** {@inheritDoc} */ @Override public boolean exists(IgfsPath path) { try { - return fileSys.exists(convert(path)); + return fileSysForUser().exists(convert(path)); } catch (IOException e) { throw handleSecondaryFsError(e, "Failed to check file existence [path=" + path + "]"); @@ -189,6 +218,8 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys @Nullable @Override public IgfsFile update(IgfsPath path, Map<String, String> props) { HadoopIgfsProperties props0 = new HadoopIgfsProperties(props); + final FileSystem fileSys = fileSysForUser(); + try { if (props0.userName() != null || props0.groupName() != null) fileSys.setOwner(convert(path), props0.userName(), props0.groupName()); @@ -208,7 +239,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys @Override public void rename(IgfsPath src, IgfsPath dest) { // Delegate to the secondary file system. try { - if (!fileSys.rename(convert(src), convert(dest))) + if (!fileSysForUser().rename(convert(src), convert(dest))) throw new IgfsException("Failed to rename (secondary file system returned false) " + "[src=" + src + ", dest=" + dest + ']'); } @@ -220,7 +251,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys /** {@inheritDoc} */ @Override public boolean delete(IgfsPath path, boolean recursive) { try { - return fileSys.delete(convert(path), recursive); + return fileSysForUser().delete(convert(path), recursive); } catch (IOException e) { throw handleSecondaryFsError(e, "Failed to delete file [path=" + path + ", recursive=" + recursive + "]"); @@ -230,7 +261,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys /** {@inheritDoc} */ @Override public void mkdirs(IgfsPath path) { try { - if (!fileSys.mkdirs(convert(path))) + if (!fileSysForUser().mkdirs(convert(path))) throw new IgniteException("Failed to make directories [path=" + path + "]"); } catch (IOException e) { @@ -241,7 +272,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys /** {@inheritDoc} */ @Override public void mkdirs(IgfsPath path, @Nullable Map<String, String> props) { try { - if (!fileSys.mkdirs(convert(path), new HadoopIgfsProperties(props).permission())) + if (!fileSysForUser().mkdirs(convert(path), new HadoopIgfsProperties(props).permission())) throw new IgniteException("Failed to make directories [path=" + path + ", props=" + props + "]"); } catch (IOException e) { @@ -252,7 +283,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys /** {@inheritDoc} */ @Override public Collection<IgfsPath> listPaths(IgfsPath path) { try { - FileStatus[] statuses = fileSys.listStatus(convert(path)); + FileStatus[] statuses = fileSysForUser().listStatus(convert(path)); if (statuses == null) throw new IgfsPathNotFoundException("Failed to list files (path not found): " + path); @@ -275,7 +306,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys /** {@inheritDoc} */ @Override public Collection<IgfsFile> listFiles(IgfsPath path) { try { - FileStatus[] statuses = fileSys.listStatus(convert(path)); + FileStatus[] statuses = fileSysForUser().listStatus(convert(path)); if (statuses == null) throw new IgfsPathNotFoundException("Failed to list files (path not found): " + path); @@ -302,13 +333,13 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys /** {@inheritDoc} */ @Override public IgfsSecondaryFileSystemPositionedReadable open(IgfsPath path, int bufSize) { - return new HadoopIgfsSecondaryFileSystemPositionedReadable(fileSys, convert(path), bufSize); + return new HadoopIgfsSecondaryFileSystemPositionedReadable(fileSysForUser(), convert(path), bufSize); } /** {@inheritDoc} */ @Override public OutputStream create(IgfsPath path, boolean overwrite) { try { - return fileSys.create(convert(path), overwrite); + return fileSysForUser().create(convert(path), overwrite); } catch (IOException e) { throw handleSecondaryFsError(e, "Failed to create file [path=" + path + ", overwrite=" + overwrite + "]"); @@ -322,8 +353,8 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys new HadoopIgfsProperties(props != null ? props : Collections.<String, String>emptyMap()); try { - return fileSys.create(convert(path), props0.permission(), overwrite, bufSize, (short)replication, blockSize, - null); + return fileSysForUser().create(convert(path), props0.permission(), overwrite, bufSize, + (short)replication, blockSize, null); } catch (IOException e) { throw handleSecondaryFsError(e, "Failed to create file [path=" + path + ", props=" + props + @@ -336,7 +367,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys @Override public OutputStream append(IgfsPath path, int bufSize, boolean create, @Nullable Map<String, String> props) { try { - return fileSys.append(convert(path), bufSize); + return fileSysForUser().append(convert(path), bufSize); } catch (IOException e) { throw handleSecondaryFsError(e, "Failed to append file [path=" + path + ", bufSize=" + bufSize + "]"); @@ -346,7 +377,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys /** {@inheritDoc} */ @Override public IgfsFile info(final IgfsPath path) { try { - final FileStatus status = fileSys.getFileStatus(convert(path)); + final FileStatus status = fileSysForUser().getFileStatus(convert(path)); if (status == null) return null; @@ -421,7 +452,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys try { // We don't use FileSystem#getUsed() since it counts only the files // in the filesystem root, not all the files recursively. - return fileSys.getContentSummary(new Path("/")).getSpaceConsumed(); + return fileSysForUser().getContentSummary(new Path("/")).getSpaceConsumed(); } catch (IOException e) { throw handleSecondaryFsError(e, "Failed to get used space size of file system."); @@ -429,25 +460,57 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys } /** {@inheritDoc} */ - @Nullable @Override public Map<String, String> properties() { + @Override public Map<String, String> properties() { return props; } /** {@inheritDoc} */ - @Override public void close() throws IgniteCheckedException { + @Override public void close() throws IgniteException { + Exception e = null; + try { - fileSys.close(); + dfltFs.close(); } - catch (IOException e) { - throw new IgniteCheckedException(e); + catch (Exception e0) { + e = e0; + } + + try { + fileSysLazyMap.close(); + } + catch (IgniteCheckedException ice) { + if (e == null) + e = ice; } + + if (e != null) + throw new IgniteException(e); } /** * Gets the underlying {@link FileSystem}. + * This method is used solely for testing. * @return the underlying Hadoop {@link FileSystem}. */ public FileSystem fileSystem() { - return fileSys; + return fileSysForUser(); + } + + /** + * Gets the FileSystem for the current context user. + * @return the FileSystem instance, never null. + */ + private FileSystem fileSysForUser() { + String user = IgfsUserContext.currentUser(); + + if (F.isEmpty(user)) + user = dfltUserName; // default is never empty. + + assert !F.isEmpty(user); + + if (F.eq(user, dfltUserName)) + return dfltFs; // optimization + + return fileSysLazyMap.getOrCreate(user); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java index 1f53a06..c0a9ade 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java @@ -23,6 +23,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.permission.*; import org.apache.hadoop.hdfs.*; import org.apache.hadoop.mapreduce.*; +import org.apache.hadoop.security.*; import org.apache.hadoop.util.*; import org.apache.ignite.*; import org.apache.ignite.igfs.*; @@ -97,21 +98,8 @@ public class IgniteHadoopFileSystem extends FileSystem { /** Grid remote client. */ private HadoopIgfsWrapper rmtClient; - /** User name for each thread. */ - private final ThreadLocal<String> userName = new ThreadLocal<String>(){ - /** {@inheritDoc} */ - @Override protected String initialValue() { - return DFLT_USER_NAME; - } - }; - - /** Working directory for each thread. */ - private final ThreadLocal<Path> workingDir = new ThreadLocal<Path>(){ - /** {@inheritDoc} */ - @Override protected Path initialValue() { - return getHomeDirectory(); - } - }; + /** working directory. */ + private Path workingDir; /** Default replication factor. */ private short dfltReplication; @@ -129,6 +117,9 @@ public class IgniteHadoopFileSystem extends FileSystem { /** Secondary URI string. */ private URI secondaryUri; + /** The user name this file system was created on behalf of. */ + private String user; + /** IGFS mode resolver. */ private IgfsModeResolver modeRslvr; @@ -182,6 +173,36 @@ public class IgniteHadoopFileSystem extends FileSystem { } /** + * Gets non-null and interned user name as per the Hadoop file system viewpoint. + * @return the user name, never null. + */ + public static String getFsHadoopUser(Configuration cfg) throws IOException { + String user = null; + + // ------------------------------------------- + // TODO: Temporary workaround, see https://issues.apache.org/jira/browse/IGNITE-761 + // We have an issue there: sometimes FileSystem created from MR jobs gets incorrect + // UserGroupInformation.getCurrentUser() despite of the fact that it is invoked in correct + // ugi.doAs() closure. + if (cfg != null) + user = cfg.get(MRJobConfig.USER_NAME); + // ------------------------------------------- + + if (user == null) { + UserGroupInformation currUgi = UserGroupInformation.getCurrentUser(); + + if (currUgi != null) + user = currUgi.getShortUserName(); + } + + user = IgfsUtils.fixUserName(user); + + assert user != null; + + return user; + } + + /** * Public setter that can be used by direct users of FS or Visor. * * @param colocateFileWrites Whether all ongoing file writes should be colocated. @@ -221,7 +242,7 @@ public class IgniteHadoopFileSystem extends FileSystem { uriAuthority = uri.getAuthority(); - setUser(cfg.get(MRJobConfig.USER_NAME, DFLT_USER_NAME)); + user = getFsHadoopUser(cfg); // Override sequential reads before prefetch if needed. seqReadsBeforePrefetch = parameter(cfg, PARAM_IGFS_SEQ_READS_BEFORE_PREFETCH, uriAuthority, 0); @@ -244,7 +265,7 @@ public class IgniteHadoopFileSystem extends FileSystem { String logDir = logDirFile != null ? logDirFile.getAbsolutePath() : null; - rmtClient = new HadoopIgfsWrapper(uriAuthority, logDir, cfg, LOG); + rmtClient = new HadoopIgfsWrapper(uriAuthority, logDir, cfg, LOG, user); // Handshake. IgfsHandshakeResponse handshake = rmtClient.handshake(logDir); @@ -289,13 +310,12 @@ public class IgniteHadoopFileSystem extends FileSystem { String secUri = props.get(SECONDARY_FS_URI); String secConfPath = props.get(SECONDARY_FS_CONFIG_PATH); - String secUserName = props.get(SECONDARY_FS_USER_NAME); try { - SecondaryFileSystemProvider secProvider = new SecondaryFileSystemProvider(secUri, secConfPath, - secUserName); + SecondaryFileSystemProvider secProvider = new SecondaryFileSystemProvider(secUri, secConfPath); + + secondaryFs = secProvider.createFileSystem(user); - secondaryFs = secProvider.createFileSystem(); secondaryUri = secProvider.uri(); } catch (IOException e) { @@ -306,6 +326,9 @@ public class IgniteHadoopFileSystem extends FileSystem { "will have no effect): " + e.getMessage()); } } + + // set working directory to the home directory of the current Fs user: + setWorkingDirectory(null); } finally { leaveBusy(); @@ -849,22 +872,11 @@ public class IgniteHadoopFileSystem extends FileSystem { /** {@inheritDoc} */ @Override public Path getHomeDirectory() { - Path path = new Path("/user/" + userName.get()); + Path path = new Path("/user/" + user); return path.makeQualified(getUri(), null); } - /** - * Set user name and default working directory for current thread. - * - * @param userName User name. - */ - public void setUser(String userName) { - this.userName.set(userName); - - setWorkingDirectory(null); - } - /** {@inheritDoc} */ @Override public void setWorkingDirectory(Path newPath) { if (newPath == null) { @@ -873,7 +885,7 @@ public class IgniteHadoopFileSystem extends FileSystem { if (secondaryFs != null) secondaryFs.setWorkingDirectory(toSecondary(homeDir)); - workingDir.set(homeDir); + workingDir = homeDir; } else { Path fixedNewPath = fixRelativePart(newPath); @@ -886,13 +898,13 @@ public class IgniteHadoopFileSystem extends FileSystem { if (secondaryFs != null) secondaryFs.setWorkingDirectory(toSecondary(fixedNewPath)); - workingDir.set(fixedNewPath); + workingDir = fixedNewPath; } } /** {@inheritDoc} */ @Override public Path getWorkingDirectory() { - return workingDir.get(); + return workingDir; } /** {@inheritDoc} */ @@ -1153,7 +1165,7 @@ public class IgniteHadoopFileSystem extends FileSystem { return null; return path.isAbsolute() ? new IgfsPath(path.toUri().getPath()) : - new IgfsPath(convert(workingDir.get()), path.toUri().getPath()); + new IgfsPath(convert(workingDir), path.toUri().getPath()); } /** @@ -1191,9 +1203,16 @@ public class IgniteHadoopFileSystem extends FileSystem { */ @SuppressWarnings("deprecation") private FileStatus convert(IgfsFile file) { - return new FileStatus(file.length(), file.isDirectory(), getDefaultReplication(), - file.groupBlockSize(), file.modificationTime(), file.accessTime(), permission(file), - file.property(PROP_USER_NAME, DFLT_USER_NAME), file.property(PROP_GROUP_NAME, "users"), + return new FileStatus( + file.length(), + file.isDirectory(), + getDefaultReplication(), + file.groupBlockSize(), + file.modificationTime(), + file.accessTime(), + permission(file), + file.property(PROP_USER_NAME, user), + file.property(PROP_GROUP_NAME, "users"), convert(file.path())) { @Override public String toString() { return "FileStatus [path=" + getPath() + ", isDir=" + isDir() + ", len=" + getLen() + @@ -1247,4 +1266,12 @@ public class IgniteHadoopFileSystem extends FileSystem { @Override public String toString() { return S.toString(IgniteHadoopFileSystem.class, this); } + + /** + * Returns the user name this File System is created on behalf of. + * @return the user name + */ + public String user() { + return user; + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java index 9cfb79b..f3fbe9c 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java @@ -22,7 +22,6 @@ import org.apache.hadoop.conf.*; import org.apache.hadoop.fs.*; import org.apache.hadoop.fs.Options; import org.apache.hadoop.fs.permission.*; -import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.util.*; import org.apache.ignite.*; import org.apache.ignite.igfs.*; @@ -40,6 +39,7 @@ import java.util.*; import java.util.concurrent.atomic.*; import static org.apache.ignite.configuration.FileSystemConfiguration.*; +import static org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem.*; import static org.apache.ignite.igfs.IgfsMode.*; import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.*; import static org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsUtils.*; @@ -91,11 +91,14 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea /** Grid remote client. */ private HadoopIgfsWrapper rmtClient; + /** The name of the user this File System created on behalf of. */ + private final String user; + /** Working directory. */ private IgfsPath workingDir; /** URI. */ - private URI uri; + private final URI uri; /** Authority. */ private String uriAuthority; @@ -141,6 +144,8 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea uri = name; + user = getFsHadoopUser(cfg); + try { initialize(name, cfg); } @@ -152,7 +157,7 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea throw e; } - workingDir = new IgfsPath("/user/" + cfg.get(MRJobConfig.USER_NAME, DFLT_USER_NAME)); + workingDir = new IgfsPath("/user/" + user); } /** {@inheritDoc} */ @@ -240,7 +245,7 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea String logDir = logDirFile != null ? logDirFile.getAbsolutePath() : null; - rmtClient = new HadoopIgfsWrapper(uriAuthority, logDir, cfg, LOG); + rmtClient = new HadoopIgfsWrapper(uriAuthority, logDir, cfg, LOG, user); // Handshake. IgfsHandshakeResponse handshake = rmtClient.handshake(logDir); @@ -284,13 +289,12 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea String secUri = props.get(SECONDARY_FS_URI); String secConfPath = props.get(SECONDARY_FS_CONFIG_PATH); - String secUserName = props.get(SECONDARY_FS_USER_NAME); try { - SecondaryFileSystemProvider secProvider = new SecondaryFileSystemProvider(secUri, secConfPath, - secUserName); + SecondaryFileSystemProvider secProvider = new SecondaryFileSystemProvider(secUri, secConfPath); + + secondaryFs = secProvider.createAbstractFileSystem(user); - secondaryFs = secProvider.createAbstractFileSystem(); secondaryUri = secProvider.uri(); } catch (IOException e) { @@ -929,7 +933,7 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea file.modificationTime(), file.accessTime(), permission(file), - file.property(PROP_USER_NAME, DFLT_USER_NAME), + file.property(PROP_USER_NAME, user), file.property(PROP_GROUP_NAME, "users"), convert(file.path())) { @Override public String toString() { @@ -983,4 +987,12 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea @Override public String toString() { return S.toString(IgniteHadoopFileSystem.class, this); } -} + + /** + * Returns the user name this File System is created on behalf of. + * @return the user name + */ + public String user() { + return user; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java index 00be422..d493bd4 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java @@ -126,11 +126,15 @@ public class HadoopUtils { break; case PHASE_REDUCE: - assert status.totalReducerCnt() > 0; - + // TODO: temporary fixed, but why PHASE_REDUCE could have 0 reducers? + // See https://issues.apache.org/jira/browse/IGNITE-764 setupProgress = 1; mapProgress = 1; - reduceProgress = 1f - status.pendingReducerCnt() / (float)status.totalReducerCnt(); + + if (status.totalReducerCnt() > 0) + reduceProgress = 1f - status.pendingReducerCnt() / (float)status.totalReducerCnt(); + else + reduceProgress = 1f; break; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java index 27805f8..b1a057c 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java @@ -19,12 +19,15 @@ package org.apache.ignite.internal.processors.hadoop; import org.apache.hadoop.conf.*; import org.apache.hadoop.fs.*; +import org.apache.hadoop.security.*; +import org.apache.ignite.internal.processors.igfs.*; import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.jetbrains.annotations.*; import java.io.*; import java.net.*; +import java.security.*; /** * Encapsulates logic of secondary filesystem creation. @@ -36,9 +39,6 @@ public class SecondaryFileSystemProvider { /** The secondary filesystem URI, never null. */ private final URI uri; - /** Optional user name to log into secondary filesystem with. */ - private @Nullable final String userName; - /** * Creates new provider with given config parameters. The configuration URL is optional. The filesystem URI must be * specified either explicitly or in the configuration provided. @@ -47,13 +47,10 @@ public class SecondaryFileSystemProvider { * property in the provided configuration. * @param secConfPath the secondary Fs path (file path on the local file system, optional). * See {@link IgniteUtils#resolveIgniteUrl(String)} on how the path resolved. - * @param userName User name. * @throws IOException */ public SecondaryFileSystemProvider(final @Nullable String secUri, - final @Nullable String secConfPath, @Nullable String userName) throws IOException { - this.userName = userName; - + final @Nullable String secConfPath) throws IOException { if (secConfPath != null) { URL url = U.resolveIgniteUrl(secConfPath); @@ -88,20 +85,18 @@ public class SecondaryFileSystemProvider { * @return {@link org.apache.hadoop.fs.FileSystem} instance for this secondary Fs. * @throws IOException */ - public FileSystem createFileSystem() throws IOException { + public FileSystem createFileSystem(String userName) throws IOException { + userName = IgfsUtils.fixUserName(userName); + final FileSystem fileSys; - if (userName == null) - fileSys = FileSystem.get(uri, cfg); - else { - try { - fileSys = FileSystem.get(uri, cfg, userName); - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); + try { + fileSys = FileSystem.get(uri, cfg, userName); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); - throw new IOException("Failed to create file system due to interrupt.", e); - } + throw new IOException("Failed to create file system due to interrupt.", e); } return fileSys; @@ -109,10 +104,26 @@ public class SecondaryFileSystemProvider { /** * @return {@link org.apache.hadoop.fs.AbstractFileSystem} instance for this secondary Fs. - * @throws IOException + * @throws IOException in case of error. */ - public AbstractFileSystem createAbstractFileSystem() throws IOException { - return AbstractFileSystem.get(uri, cfg); + public AbstractFileSystem createAbstractFileSystem(String userName) throws IOException { + userName = IgfsUtils.fixUserName(userName); + + String ticketCachePath = cfg.get(CommonConfigurationKeys.KERBEROS_TICKET_CACHE_PATH); + + UserGroupInformation ugi = UserGroupInformation.getBestUGI(ticketCachePath, userName); + + try { + return ugi.doAs(new PrivilegedExceptionAction<AbstractFileSystem>() { + @Override public AbstractFileSystem run() throws IOException { + return AbstractFileSystem.get(uri, cfg); + } + }); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + + throw new IOException("Failed to create file system due to interrupt.", ie); + } } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopDistributedFileSystem.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopDistributedFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopDistributedFileSystem.java deleted file mode 100644 index 509f443..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopDistributedFileSystem.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.fs; - -import org.apache.hadoop.conf.*; -import org.apache.hadoop.fs.*; -import org.apache.hadoop.hdfs.*; -import org.apache.hadoop.mapreduce.*; - -import java.io.*; -import java.net.*; - -import static org.apache.ignite.configuration.FileSystemConfiguration.*; - -/** - * Wrapper of HDFS for support of separated working directory. - */ -public class HadoopDistributedFileSystem extends DistributedFileSystem { - /** User name for each thread. */ - private final ThreadLocal<String> userName = new ThreadLocal<String>() { - /** {@inheritDoc} */ - @Override protected String initialValue() { - return DFLT_USER_NAME; - } - }; - - /** Working directory for each thread. */ - private final ThreadLocal<Path> workingDir = new ThreadLocal<Path>() { - /** {@inheritDoc} */ - @Override protected Path initialValue() { - return getHomeDirectory(); - } - }; - - /** {@inheritDoc} */ - @Override public void initialize(URI uri, Configuration conf) throws IOException { - super.initialize(uri, conf); - - setUser(conf.get(MRJobConfig.USER_NAME, DFLT_USER_NAME)); - } - - /** - * Set user name and default working directory for current thread. - * - * @param userName User name. - */ - public void setUser(String userName) { - this.userName.set(userName); - - setWorkingDirectory(getHomeDirectory()); - } - - /** {@inheritDoc} */ - @Override public Path getHomeDirectory() { - Path path = new Path("/user/" + userName.get()); - - return path.makeQualified(getUri(), null); - } - - /** {@inheritDoc} */ - @Override public void setWorkingDirectory(Path dir) { - Path fixedDir = fixRelativePart(dir); - - String res = fixedDir.toUri().getPath(); - - if (!DFSUtil.isValidName(res)) - throw new IllegalArgumentException("Invalid DFS directory name " + res); - - workingDir.set(fixedDir); - } - - /** {@inheritDoc} */ - @Override public Path getWorkingDirectory() { - return workingDir.get(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java index f3f51d4..d90bc28 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java @@ -19,8 +19,6 @@ package org.apache.ignite.internal.processors.hadoop.fs; import org.apache.hadoop.conf.*; import org.apache.hadoop.fs.*; -import org.apache.hadoop.hdfs.protocol.*; -import org.apache.ignite.hadoop.fs.v1.*; /** * Utilities for configuring file systems to support the separate working directory per each thread. @@ -30,19 +28,6 @@ public class HadoopFileSystemsUtils { public static final String LOC_FS_WORK_DIR_PROP = "fs." + FsConstants.LOCAL_FS_URI.getScheme() + ".workDir"; /** - * Set user name and default working directory for current thread if it's supported by file system. - * - * @param fs File system. - * @param userName User name. - */ - public static void setUser(FileSystem fs, String userName) { - if (fs instanceof IgniteHadoopFileSystem) - ((IgniteHadoopFileSystem)fs).setUser(userName); - else if (fs instanceof HadoopDistributedFileSystem) - ((HadoopDistributedFileSystem)fs).setUser(userName); - } - - /** * Setup wrappers of filesystems to support the separate working directory. * * @param cfg Config for setup. @@ -51,7 +36,5 @@ public class HadoopFileSystemsUtils { cfg.set("fs." + FsConstants.LOCAL_FS_URI.getScheme() + ".impl", HadoopLocalFileSystemV1.class.getName()); cfg.set("fs.AbstractFileSystem." + FsConstants.LOCAL_FS_URI.getScheme() + ".impl", HadoopLocalFileSystemV2.class.getName()); - - cfg.set("fs." + HdfsConstants.HDFS_URI_SCHEME + ".impl", HadoopDistributedFileSystem.class.getName()); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEx.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEx.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEx.java index 2f19226..b9c5113 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEx.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsEx.java @@ -85,4 +85,10 @@ public interface HadoopIgfsEx extends HadoopIgfs { * @throws IOException If failed. */ public void flush(HadoopIgfsStreamDelegate delegate) throws IOException; + + /** + * The user this Igfs instance works on behalf of. + * @return the user name. + */ + public String user(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java index 44e531e..47ba0e8 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java @@ -23,6 +23,7 @@ import org.apache.ignite.igfs.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.igfs.*; import org.apache.ignite.internal.util.future.*; +import org.apache.ignite.lang.*; import org.jetbrains.annotations.*; import java.io.*; @@ -46,25 +47,35 @@ public class HadoopIgfsInProc implements HadoopIgfsEx { /** Logger. */ private final Log log; + /** The user this Igfs works on behalf of. */ + private final String user; + /** * Constructor. * * @param igfs Target IGFS. * @param log Log. */ - public HadoopIgfsInProc(IgfsEx igfs, Log log) { + public HadoopIgfsInProc(IgfsEx igfs, Log log, String userName) throws IgniteCheckedException { + this.user = IgfsUtils.fixUserName(userName); + this.igfs = igfs; + this.log = log; bufSize = igfs.configuration().getBlockSize() * 2; } /** {@inheritDoc} */ - @Override public IgfsHandshakeResponse handshake(String logDir) { - igfs.clientLogDirectory(logDir); + @Override public IgfsHandshakeResponse handshake(final String logDir) { + return IgfsUserContext.doAs(user, new IgniteOutClosure<IgfsHandshakeResponse>() { + @Override public IgfsHandshakeResponse apply() { + igfs.clientLogDirectory(logDir); - return new IgfsHandshakeResponse(igfs.name(), igfs.proxyPaths(), igfs.groupBlockSize(), - igfs.globalSampling()); + return new IgfsHandshakeResponse(igfs.name(), igfs.proxyPaths(), igfs.groupBlockSize(), + igfs.globalSampling()); + } + }); } /** {@inheritDoc} */ @@ -82,9 +93,13 @@ public class HadoopIgfsInProc implements HadoopIgfsEx { } /** {@inheritDoc} */ - @Override public IgfsFile info(IgfsPath path) throws IgniteCheckedException { + @Override public IgfsFile info(final IgfsPath path) throws IgniteCheckedException { try { - return igfs.info(path); + return IgfsUserContext.doAs(user, new IgniteOutClosure<IgfsFile>() { + @Override public IgfsFile apply() { + return igfs.info(path); + } + }); } catch (IgniteException e) { throw new IgniteCheckedException(e); @@ -95,9 +110,13 @@ public class HadoopIgfsInProc implements HadoopIgfsEx { } /** {@inheritDoc} */ - @Override public IgfsFile update(IgfsPath path, Map<String, String> props) throws IgniteCheckedException { + @Override public IgfsFile update(final IgfsPath path, final Map<String, String> props) throws IgniteCheckedException { try { - return igfs.update(path, props); + return IgfsUserContext.doAs(user, new IgniteOutClosure<IgfsFile>() { + @Override public IgfsFile apply() { + return igfs.update(path, props); + } + }); } catch (IgniteException e) { throw new IgniteCheckedException(e); @@ -108,9 +127,15 @@ public class HadoopIgfsInProc implements HadoopIgfsEx { } /** {@inheritDoc} */ - @Override public Boolean setTimes(IgfsPath path, long accessTime, long modificationTime) throws IgniteCheckedException { + @Override public Boolean setTimes(final IgfsPath path, final long accessTime, final long modificationTime) throws IgniteCheckedException { try { - igfs.setTimes(path, accessTime, modificationTime); + IgfsUserContext.doAs(user, new IgniteOutClosure<Void>() { + @Override public Void apply() { + igfs.setTimes(path, accessTime, modificationTime); + + return null; + } + }); return true; } @@ -124,9 +149,15 @@ public class HadoopIgfsInProc implements HadoopIgfsEx { } /** {@inheritDoc} */ - @Override public Boolean rename(IgfsPath src, IgfsPath dest) throws IgniteCheckedException { + @Override public Boolean rename(final IgfsPath src, final IgfsPath dest) throws IgniteCheckedException { try { - igfs.rename(src, dest); + IgfsUserContext.doAs(user, new IgniteOutClosure<Void>() { + @Override public Void apply() { + igfs.rename(src, dest); + + return null; + } + }); return true; } @@ -139,9 +170,13 @@ public class HadoopIgfsInProc implements HadoopIgfsEx { } /** {@inheritDoc} */ - @Override public Boolean delete(IgfsPath path, boolean recursive) throws IgniteCheckedException { + @Override public Boolean delete(final IgfsPath path, final boolean recursive) throws IgniteCheckedException { try { - return igfs.delete(path, recursive); + return IgfsUserContext.doAs(user, new IgniteOutClosure<Boolean>() { + @Override public Boolean apply() { + return igfs.delete(path, recursive); + } + }); } catch (IgniteException e) { throw new IgniteCheckedException(e); @@ -154,18 +189,32 @@ public class HadoopIgfsInProc implements HadoopIgfsEx { /** {@inheritDoc} */ @Override public IgfsStatus fsStatus() throws IgniteCheckedException { try { - return igfs.globalSpace(); + return IgfsUserContext.doAs(user, new Callable<IgfsStatus>() { + @Override public IgfsStatus call() throws IgniteCheckedException { + return igfs.globalSpace(); + } + }); } catch (IllegalStateException e) { throw new HadoopIgfsCommunicationException("Failed to get file system status because Grid is " + "stopping."); } + catch (IgniteCheckedException | RuntimeException | Error e) { + throw e; + } + catch (Exception e) { + throw new AssertionError("Must never go there."); + } } /** {@inheritDoc} */ - @Override public Collection<IgfsPath> listPaths(IgfsPath path) throws IgniteCheckedException { + @Override public Collection<IgfsPath> listPaths(final IgfsPath path) throws IgniteCheckedException { try { - return igfs.listPaths(path); + return IgfsUserContext.doAs(user, new IgniteOutClosure<Collection<IgfsPath>>() { + @Override public Collection<IgfsPath> apply() { + return igfs.listPaths(path); + } + }); } catch (IgniteException e) { throw new IgniteCheckedException(e); @@ -176,9 +225,13 @@ public class HadoopIgfsInProc implements HadoopIgfsEx { } /** {@inheritDoc} */ - @Override public Collection<IgfsFile> listFiles(IgfsPath path) throws IgniteCheckedException { + @Override public Collection<IgfsFile> listFiles(final IgfsPath path) throws IgniteCheckedException { try { - return igfs.listFiles(path); + return IgfsUserContext.doAs(user, new IgniteOutClosure<Collection<IgfsFile>>() { + @Override public Collection<IgfsFile> apply() { + return igfs.listFiles(path); + } + }); } catch (IgniteException e) { throw new IgniteCheckedException(e); @@ -189,9 +242,15 @@ public class HadoopIgfsInProc implements HadoopIgfsEx { } /** {@inheritDoc} */ - @Override public Boolean mkdirs(IgfsPath path, Map<String, String> props) throws IgniteCheckedException { + @Override public Boolean mkdirs(final IgfsPath path, final Map<String, String> props) throws IgniteCheckedException { try { - igfs.mkdirs(path, props); + IgfsUserContext.doAs(user, new IgniteOutClosure<Void>() { + @Override public Void apply() { + igfs.mkdirs(path, props); + + return null; + } + }); return true; } @@ -205,9 +264,13 @@ public class HadoopIgfsInProc implements HadoopIgfsEx { } /** {@inheritDoc} */ - @Override public IgfsPathSummary contentSummary(IgfsPath path) throws IgniteCheckedException { + @Override public IgfsPathSummary contentSummary(final IgfsPath path) throws IgniteCheckedException { try { - return igfs.summary(path); + return IgfsUserContext.doAs(user, new IgniteOutClosure<IgfsPathSummary>() { + @Override public IgfsPathSummary apply() { + return igfs.summary(path); + } + }); } catch (IgniteException e) { throw new IgniteCheckedException(e); @@ -219,10 +282,14 @@ public class HadoopIgfsInProc implements HadoopIgfsEx { } /** {@inheritDoc} */ - @Override public Collection<IgfsBlockLocation> affinity(IgfsPath path, long start, long len) + @Override public Collection<IgfsBlockLocation> affinity(final IgfsPath path, final long start, final long len) throws IgniteCheckedException { try { - return igfs.affinity(path, start, len); + return IgfsUserContext.doAs(user, new IgniteOutClosure<Collection<IgfsBlockLocation>>() { + @Override public Collection<IgfsBlockLocation> apply() { + return igfs.affinity(path, start, len); + } + }); } catch (IgniteException e) { throw new IgniteCheckedException(e); @@ -233,11 +300,15 @@ public class HadoopIgfsInProc implements HadoopIgfsEx { } /** {@inheritDoc} */ - @Override public HadoopIgfsStreamDelegate open(IgfsPath path) throws IgniteCheckedException { + @Override public HadoopIgfsStreamDelegate open(final IgfsPath path) throws IgniteCheckedException { try { - IgfsInputStreamAdapter stream = igfs.open(path, bufSize); + return IgfsUserContext.doAs(user, new IgniteOutClosure<HadoopIgfsStreamDelegate>() { + @Override public HadoopIgfsStreamDelegate apply() { + IgfsInputStreamAdapter stream = igfs.open(path, bufSize); - return new HadoopIgfsStreamDelegate(this, stream, stream.fileInfo().length()); + return new HadoopIgfsStreamDelegate(HadoopIgfsInProc.this, stream, stream.fileInfo().length()); + } + }); } catch (IgniteException e) { throw new IgniteCheckedException(e); @@ -248,12 +319,16 @@ public class HadoopIgfsInProc implements HadoopIgfsEx { } /** {@inheritDoc} */ - @Override public HadoopIgfsStreamDelegate open(IgfsPath path, int seqReadsBeforePrefetch) + @Override public HadoopIgfsStreamDelegate open(final IgfsPath path, final int seqReadsBeforePrefetch) throws IgniteCheckedException { try { - IgfsInputStreamAdapter stream = igfs.open(path, bufSize, seqReadsBeforePrefetch); + return IgfsUserContext.doAs(user, new IgniteOutClosure<HadoopIgfsStreamDelegate>() { + @Override public HadoopIgfsStreamDelegate apply() { + IgfsInputStreamAdapter stream = igfs.open(path, bufSize, seqReadsBeforePrefetch); - return new HadoopIgfsStreamDelegate(this, stream, stream.fileInfo().length()); + return new HadoopIgfsStreamDelegate(HadoopIgfsInProc.this, stream, stream.fileInfo().length()); + } + }); } catch (IgniteException e) { throw new IgniteCheckedException(e); @@ -264,13 +339,17 @@ public class HadoopIgfsInProc implements HadoopIgfsEx { } /** {@inheritDoc} */ - @Override public HadoopIgfsStreamDelegate create(IgfsPath path, boolean overwrite, boolean colocate, - int replication, long blockSize, @Nullable Map<String, String> props) throws IgniteCheckedException { + @Override public HadoopIgfsStreamDelegate create(final IgfsPath path, final boolean overwrite, final boolean colocate, + final int replication, final long blockSize, final @Nullable Map<String, String> props) throws IgniteCheckedException { try { - IgfsOutputStream stream = igfs.create(path, bufSize, overwrite, - colocate ? igfs.nextAffinityKey() : null, replication, blockSize, props); + return IgfsUserContext.doAs(user, new IgniteOutClosure<HadoopIgfsStreamDelegate>() { + @Override public HadoopIgfsStreamDelegate apply() { + IgfsOutputStream stream = igfs.create(path, bufSize, overwrite, + colocate ? igfs.nextAffinityKey() : null, replication, blockSize, props); - return new HadoopIgfsStreamDelegate(this, stream); + return new HadoopIgfsStreamDelegate(HadoopIgfsInProc.this, stream); + } + }); } catch (IgniteException e) { throw new IgniteCheckedException(e); @@ -281,12 +360,16 @@ public class HadoopIgfsInProc implements HadoopIgfsEx { } /** {@inheritDoc} */ - @Override public HadoopIgfsStreamDelegate append(IgfsPath path, boolean create, - @Nullable Map<String, String> props) throws IgniteCheckedException { + @Override public HadoopIgfsStreamDelegate append(final IgfsPath path, final boolean create, + final @Nullable Map<String, String> props) throws IgniteCheckedException { try { - IgfsOutputStream stream = igfs.append(path, bufSize, create, props); + return IgfsUserContext.doAs(user, new IgniteOutClosure<HadoopIgfsStreamDelegate>() { + @Override public HadoopIgfsStreamDelegate apply() { + IgfsOutputStream stream = igfs.append(path, bufSize, create, props); - return new HadoopIgfsStreamDelegate(this, stream); + return new HadoopIgfsStreamDelegate(HadoopIgfsInProc.this, stream); + } + }); } catch (IgniteException e) { throw new IgniteCheckedException(e); @@ -407,4 +490,9 @@ public class HadoopIgfsInProc implements HadoopIgfsEx { if (lsnr0 != null && log.isDebugEnabled()) log.debug("Removed stream event listener [delegate=" + delegate + ']'); } + + /** {@inheritDoc} */ + @Override public String user() { + return user; + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIpcIo.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIpcIo.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIpcIo.java index 0264e7b..3561e95 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIpcIo.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsIpcIo.java @@ -41,7 +41,7 @@ import java.util.concurrent.locks.*; @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") public class HadoopIgfsIpcIo implements HadoopIgfsIo { /** Logger. */ - private Log log; + private final Log log; /** Request futures map. */ private ConcurrentMap<Long, HadoopIgfsFuture> reqMap =