http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java index 7dca049..f23c62c 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java @@ -81,6 +81,9 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener /** IGFS name. */ private final String igfs; + /** The user this out proc is performing on behalf of. */ + private final String userName; + /** Client log. */ private final Log log; @@ -100,8 +103,8 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener * @param log Client logger. * @throws IOException If failed. */ - public HadoopIgfsOutProc(String host, int port, String grid, String igfs, Log log) throws IOException { - this(host, port, grid, igfs, false, log); + public HadoopIgfsOutProc(String host, int port, String grid, String igfs, Log log, String user) throws IOException { + this(host, port, grid, igfs, false, log, user); } /** @@ -113,8 +116,8 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener * @param log Client logger. * @throws IOException If failed. */ - public HadoopIgfsOutProc(int port, String grid, String igfs, Log log) throws IOException { - this(null, port, grid, igfs, true, log); + public HadoopIgfsOutProc(int port, String grid, String igfs, Log log, String user) throws IOException { + this(null, port, grid, igfs, true, log, user); } /** @@ -128,7 +131,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener * @param log Client logger. * @throws IOException If failed. */ - private HadoopIgfsOutProc(String host, int port, String grid, String igfs, boolean shmem, Log log) + private HadoopIgfsOutProc(String host, int port, String grid, String igfs, boolean shmem, Log log, String user) throws IOException { assert host != null && !shmem || host == null && shmem : "Invalid arguments [host=" + host + ", port=" + port + ", shmem=" + shmem + ']'; @@ -138,6 +141,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener this.grid = grid; this.igfs = igfs; this.log = log; + this.userName = IgfsUtils.fixUserName(user); io = HadoopIgfsIpcIo.get(log, endpoint); @@ -173,6 +177,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener msg.command(INFO); msg.path(path); + msg.userName(userName); return io.send(msg).chain(FILE_RES).get(); } @@ -184,6 +189,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener msg.command(UPDATE); msg.path(path); msg.properties(props); + msg.userName(userName); return io.send(msg).chain(FILE_RES).get(); } @@ -196,6 +202,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener msg.path(path); msg.accessTime(accessTime); msg.modificationTime(modificationTime); + msg.userName(userName); return io.send(msg).chain(BOOL_RES).get(); } @@ -207,6 +214,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener msg.command(RENAME); msg.path(src); msg.destinationPath(dest); + msg.userName(userName); return io.send(msg).chain(BOOL_RES).get(); } @@ -218,6 +226,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener msg.command(DELETE); msg.path(path); msg.flag(recursive); + msg.userName(userName); return io.send(msg).chain(BOOL_RES).get(); } @@ -231,6 +240,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener msg.path(path); msg.start(start); msg.length(len); + msg.userName(userName); return io.send(msg).chain(BLOCK_LOCATION_COL_RES).get(); } @@ -241,6 +251,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener msg.command(PATH_SUMMARY); msg.path(path); + msg.userName(userName); return io.send(msg).chain(SUMMARY_RES).get(); } @@ -252,6 +263,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener msg.command(MAKE_DIRECTORIES); msg.path(path); msg.properties(props); + msg.userName(userName); return io.send(msg).chain(BOOL_RES).get(); } @@ -262,6 +274,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener msg.command(LIST_FILES); msg.path(path); + msg.userName(userName); return io.send(msg).chain(FILE_COL_RES).get(); } @@ -272,6 +285,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener msg.command(LIST_PATHS); msg.path(path); + msg.userName(userName); return io.send(msg).chain(PATH_COL_RES).get(); } @@ -288,6 +302,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener msg.command(OPEN_READ); msg.path(path); msg.flag(false); + msg.userName(userName); IgfsInputStreamDescriptor rmtDesc = io.send(msg).chain(STREAM_DESCRIPTOR_RES).get(); @@ -303,6 +318,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener msg.path(path); msg.flag(true); msg.sequentialReadsBeforePrefetch(seqReadsBeforePrefetch); + msg.userName(userName); IgfsInputStreamDescriptor rmtDesc = io.send(msg).chain(STREAM_DESCRIPTOR_RES).get(); @@ -321,6 +337,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener msg.properties(props); msg.replication(replication); msg.blockSize(blockSize); + msg.userName(userName); Long streamId = io.send(msg).chain(LONG_RES).get(); @@ -336,6 +353,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener msg.path(path); msg.flag(create); msg.properties(props); + msg.userName(userName); Long streamId = io.send(msg).chain(LONG_RES).get(); @@ -471,4 +489,9 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener } }; } + + /** {@inheritDoc} */ + @Override public String user() { + return userName; + } }
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java index 1dada21..7d0db49 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java @@ -55,6 +55,9 @@ public class HadoopIgfsWrapper implements HadoopIgfs { /** Logger. */ private final Log log; + /** The user name this wrapper works on behalf of. */ + private final String userName; + /** * Constructor. * @@ -63,13 +66,15 @@ public class HadoopIgfsWrapper implements HadoopIgfs { * @param conf Configuration. * @param log Current logger. */ - public HadoopIgfsWrapper(String authority, String logDir, Configuration conf, Log log) throws IOException { + public HadoopIgfsWrapper(String authority, String logDir, Configuration conf, Log log, String user) + throws IOException { try { this.authority = authority; this.endpoint = new HadoopIgfsEndpoint(authority); this.logDir = logDir; this.conf = conf; this.log = log; + this.userName = user; } catch (IgniteCheckedException e) { throw new IOException("Failed to parse endpoint: " + authority, e); @@ -362,13 +367,14 @@ public class HadoopIgfsWrapper implements HadoopIgfs { HadoopIgfsEx hadoop = null; try { - hadoop = new HadoopIgfsInProc(igfs, log); + hadoop = new HadoopIgfsInProc(igfs, log, userName); curDelegate = new Delegate(hadoop, hadoop.handshake(logDir)); } catch (IOException | IgniteCheckedException e) { if (e instanceof HadoopIgfsCommunicationException) - hadoop.close(true); + if (hadoop != null) + hadoop.close(true); if (log.isDebugEnabled()) log.debug("Failed to connect to in-proc IGFS, fallback to IPC mode.", e); @@ -384,7 +390,7 @@ public class HadoopIgfsWrapper implements HadoopIgfs { HadoopIgfsEx hadoop = null; try { - hadoop = new HadoopIgfsOutProc(endpoint.port(), endpoint.grid(), endpoint.igfs(), log); + hadoop = new HadoopIgfsOutProc(endpoint.port(), endpoint.grid(), endpoint.igfs(), log, userName); curDelegate = new Delegate(hadoop, hadoop.handshake(logDir)); } @@ -409,7 +415,7 @@ public class HadoopIgfsWrapper implements HadoopIgfs { try { hadoop = new HadoopIgfsOutProc(LOCALHOST, endpoint.port(), endpoint.grid(), endpoint.igfs(), - log); + log, userName); curDelegate = new Delegate(hadoop, hadoop.handshake(logDir)); } @@ -430,7 +436,8 @@ public class HadoopIgfsWrapper implements HadoopIgfs { HadoopIgfsEx hadoop = null; try { - hadoop = new HadoopIgfsOutProc(endpoint.host(), endpoint.port(), endpoint.grid(), endpoint.igfs(), log); + hadoop = new HadoopIgfsOutProc(endpoint.host(), endpoint.port(), endpoint.grid(), endpoint.igfs(), + log, userName); curDelegate = new Delegate(hadoop, hadoop.handshake(logDir)); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java index e9c859bd..dd18c66 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java @@ -239,9 +239,7 @@ public class HadoopV2TaskContext extends HadoopTaskContext { Thread.currentThread().setContextClassLoader(jobConf().getClassLoader()); try { - FileSystem fs = FileSystem.get(jobConf()); - - HadoopFileSystemsUtils.setUser(fs, jobConf().getUser()); + FileSystem.get(jobConf()); LocalFileSystem locFs = FileSystem.getLocal(jobConf()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfs20FileSystemAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfs20FileSystemAbstractSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfs20FileSystemAbstractSelfTest.java index d11cabb..9bcd5de 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfs20FileSystemAbstractSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfs20FileSystemAbstractSelfTest.java @@ -21,6 +21,7 @@ import org.apache.hadoop.conf.*; import org.apache.hadoop.fs.*; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.permission.*; +import org.apache.hadoop.security.*; import org.apache.ignite.*; import org.apache.ignite.cache.*; import org.apache.ignite.configuration.*; @@ -39,6 +40,7 @@ import org.jsr166.*; import java.io.*; import java.net.*; +import java.security.*; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; @@ -58,6 +60,9 @@ public abstract class HadoopIgfs20FileSystemAbstractSelfTest extends IgfsCommonA /** Thread count for multithreaded tests. */ private static final int THREAD_CNT = 8; + /** Secondary file system user. */ + private static final String SECONDARY_FS_USER = "secondary-default"; + /** IP finder. */ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); @@ -255,7 +260,7 @@ public abstract class HadoopIgfs20FileSystemAbstractSelfTest extends IgfsCommonA if (mode != PRIMARY) cfg.setSecondaryFileSystem(new IgniteHadoopIgfsSecondaryFileSystem(secondaryFileSystemUriPath(), - secondaryFileSystemConfigPath())); + secondaryFileSystemConfigPath(), SECONDARY_FS_USER)); cfg.setIpcEndpointConfiguration(primaryIpcEndpointConfiguration(gridName)); cfg.setManagementPort(-1); @@ -278,11 +283,28 @@ public abstract class HadoopIgfs20FileSystemAbstractSelfTest extends IgfsCommonA primaryFsCfg.addResource(U.resolveIgniteUrl(primaryFileSystemConfigPath())); - fs = AbstractFileSystem.get(primaryFsUri, primaryFsCfg); + UserGroupInformation ugi = UserGroupInformation.getBestUGI(null, getClientFsUser()); + + // Create Fs on behalf of the client user: + ugi.doAs(new PrivilegedExceptionAction<Object>() { + @Override public Object run() throws Exception { + fs = AbstractFileSystem.get(primaryFsUri, primaryFsCfg); + + return null; + } + }); barrier = new CyclicBarrier(THREAD_CNT); } + /** + * Gets the user the Fs client operates on bahalf of. + * @return The user the Fs client operates on bahalf of. + */ + protected String getClientFsUser() { + return "foo"; + } + /** {@inheritDoc} */ @Override protected void afterTest() throws Exception { try { @@ -297,14 +319,17 @@ public abstract class HadoopIgfs20FileSystemAbstractSelfTest extends IgfsCommonA /** @throws Exception If failed. */ public void testStatus() throws Exception { + Path file1 = new Path("/file1"); - try (FSDataOutputStream file = fs.create(new Path("/file1"), EnumSet.noneOf(CreateFlag.class), + try (FSDataOutputStream file = fs.create(file1, EnumSet.noneOf(CreateFlag.class), Options.CreateOpts.perms(FsPermission.getDefault()))) { file.write(new byte[1024 * 1024]); } FsStatus status = fs.getFsStatus(); + assertEquals(getClientFsUser(), fs.getFileStatus(file1).getOwner()); + assertEquals(4, grid(0).cluster().nodes().size()); long used = 0, max = 0; @@ -707,6 +732,8 @@ public abstract class HadoopIgfs20FileSystemAbstractSelfTest extends IgfsCommonA os.close(); + assertEquals(getClientFsUser(), fs.getFileStatus(file).getOwner()); + fs.setOwner(file, "aUser", "aGroup"); assertEquals("aUser", fs.getFileStatus(file).getOwner()); @@ -796,20 +823,20 @@ public abstract class HadoopIgfs20FileSystemAbstractSelfTest extends IgfsCommonA int cnt = 2 * 1024; - FSDataOutputStream out = fs.create(file, EnumSet.noneOf(CreateFlag.class), - Options.CreateOpts.perms(FsPermission.getDefault())); + try (FSDataOutputStream out = fs.create(file, EnumSet.noneOf(CreateFlag.class), + Options.CreateOpts.perms(FsPermission.getDefault()))) { - for (long i = 0; i < cnt; i++) - out.writeLong(i); + for (long i = 0; i < cnt; i++) + out.writeLong(i); + } - out.close(); + assertEquals(getClientFsUser(), fs.getFileStatus(file).getOwner()); - FSDataInputStream in = fs.open(file, 1024); + try (FSDataInputStream in = fs.open(file, 1024)) { - for (long i = 0; i < cnt; i++) - assertEquals(i, in.readLong()); - - in.close(); + for (long i = 0; i < cnt; i++) + assertEquals(i, in.readLong()); + } } /** @throws Exception If failed. */ @@ -1191,6 +1218,9 @@ public abstract class HadoopIgfs20FileSystemAbstractSelfTest extends IgfsCommonA assertEquals(dirPerm, fs.getFileStatus(dir).getPermission()); assertEquals(nestedDirPerm, fs.getFileStatus(nestedDir).getPermission()); + + assertEquals(getClientFsUser(), fs.getFileStatus(dir).getOwner()); + assertEquals(getClientFsUser(), fs.getFileStatus(nestedDir).getOwner()); } /** @throws Exception If failed. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java index 9e84c51..b089995 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java @@ -162,9 +162,9 @@ public class HadoopSecondaryFileSystemConfigurationTest extends IgfsCommonAbstra primaryConfFullPath = null; SecondaryFileSystemProvider provider = - new SecondaryFileSystemProvider(primaryFsUriStr, primaryConfFullPath, null); + new SecondaryFileSystemProvider(primaryFsUriStr, primaryConfFullPath); - primaryFs = provider.createFileSystem(); + primaryFs = provider.createFileSystem(null); primaryFsUri = provider.uri(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemAbstractSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemAbstractSelfTest.java index d9a3c59..b828aad 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemAbstractSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemAbstractSelfTest.java @@ -21,6 +21,7 @@ import org.apache.hadoop.conf.*; import org.apache.hadoop.fs.*; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.permission.*; +import org.apache.hadoop.security.*; import org.apache.ignite.*; import org.apache.ignite.cache.*; import org.apache.ignite.configuration.*; @@ -43,6 +44,7 @@ import org.jsr166.*; import java.io.*; import java.lang.reflect.*; import java.net.*; +import java.security.*; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; @@ -72,6 +74,9 @@ public abstract class IgniteHadoopFileSystemAbstractSelfTest extends IgfsCommonA /** Secondary file system configuration path. */ private static final String SECONDARY_CFG_PATH = "/work/core-site-test.xml"; + /** Secondary file system user. */ + private static final String SECONDARY_FS_USER = "secondary-default"; + /** Secondary endpoint configuration. */ protected static final IgfsIpcEndpointConfiguration SECONDARY_ENDPOINT_CFG; @@ -145,6 +150,14 @@ public abstract class IgniteHadoopFileSystemAbstractSelfTest extends IgfsCommonA endpoint = skipLocShmem ? "127.0.0.1:10500" : "shmem:10500"; } + /** + * Gets the user the Fs client operates on bahalf of. + * @return The user the Fs client operates on bahalf of. + */ + protected String getClientFsUser() { + return "foo"; + } + /** {@inheritDoc} */ @Override protected void beforeTestsStarted() throws Exception { Configuration secondaryConf = configuration(SECONDARY_AUTHORITY, true, true); @@ -235,7 +248,17 @@ public abstract class IgniteHadoopFileSystemAbstractSelfTest extends IgfsCommonA primaryFsCfg = configuration(PRIMARY_AUTHORITY, skipEmbed, skipLocShmem); - fs = FileSystem.get(primaryFsUri, primaryFsCfg); + UserGroupInformation clientUgi = UserGroupInformation.getBestUGI(null, getClientFsUser()); + assertNotNull(clientUgi); + + // Create the Fs on behalf of the specific user: + clientUgi.doAs(new PrivilegedExceptionAction<Object>() { + @Override public Object run() throws Exception { + fs = FileSystem.get(primaryFsUri, primaryFsCfg); + + return null; + } + }); barrier = new CyclicBarrier(THREAD_CNT); } @@ -324,7 +347,8 @@ public abstract class IgniteHadoopFileSystemAbstractSelfTest extends IgfsCommonA cfg.setDefaultMode(mode); if (mode != PRIMARY) - cfg.setSecondaryFileSystem(new IgniteHadoopIgfsSecondaryFileSystem(SECONDARY_URI, SECONDARY_CFG_PATH)); + cfg.setSecondaryFileSystem(new IgniteHadoopIgfsSecondaryFileSystem( + SECONDARY_URI, SECONDARY_CFG_PATH, SECONDARY_FS_USER)); cfg.setIpcEndpointConfiguration(primaryIpcEndpointConfiguration(gridName)); @@ -870,6 +894,8 @@ public abstract class IgniteHadoopFileSystemAbstractSelfTest extends IgfsCommonA os.close(); + assertEquals(getClientFsUser(), fs.getFileStatus(file).getOwner()); + fs.setOwner(file, "aUser", "aGroup"); assertEquals("aUser", fs.getFileStatus(file).getOwner()); @@ -1001,19 +1027,19 @@ public abstract class IgniteHadoopFileSystemAbstractSelfTest extends IgfsCommonA int cnt = 2 * 1024; - FSDataOutputStream out = fs.create(file, true, 1024); - - for (long i = 0; i < cnt; i++) - out.writeLong(i); + try (FSDataOutputStream out = fs.create(file, true, 1024)) { - out.close(); + for (long i = 0; i < cnt; i++) + out.writeLong(i); + } - FSDataInputStream in = fs.open(file, 1024); + assertEquals(getClientFsUser(), fs.getFileStatus(file).getOwner()); - for (long i = 0; i < cnt; i++) - assertEquals(i, in.readLong()); + try (FSDataInputStream in = fs.open(file, 1024)) { - in.close(); + for (long i = 0; i < cnt; i++) + assertEquals(i, in.readLong()); + } } /** @throws Exception If failed. */ @@ -1344,7 +1370,7 @@ public abstract class IgniteHadoopFileSystemAbstractSelfTest extends IgfsCommonA String path = fs.getFileStatus(file).getPath().toString(); - assertTrue(path.endsWith("/user/" + System.getProperty("user.name", "anonymous") + "/file")); + assertTrue(path.endsWith("/user/" + getClientFsUser() + "/file")); } /** @throws Exception If failed. */ @@ -1374,7 +1400,7 @@ public abstract class IgniteHadoopFileSystemAbstractSelfTest extends IgfsCommonA public void testGetWorkingDirectoryIfDefault() throws Exception { String path = fs.getWorkingDirectory().toString(); - assertTrue(path.endsWith("/user/" + System.getProperty("user.name", "anonymous"))); + assertTrue(path.endsWith("/user/" + getClientFsUser())); } /** @throws Exception If failed. */ @@ -1412,17 +1438,20 @@ public abstract class IgniteHadoopFileSystemAbstractSelfTest extends IgfsCommonA @SuppressWarnings("OctalInteger") public void testMkdirs() throws Exception { Path fsHome = new Path(PRIMARY_URI); - Path dir = new Path(fsHome, "/tmp/staging"); - Path nestedDir = new Path(dir, "nested"); + final Path dir = new Path(fsHome, "/tmp/staging"); + final Path nestedDir = new Path(dir, "nested"); - FsPermission dirPerm = FsPermission.createImmutable((short)0700); - FsPermission nestedDirPerm = FsPermission.createImmutable((short)111); + final FsPermission dirPerm = FsPermission.createImmutable((short)0700); + final FsPermission nestedDirPerm = FsPermission.createImmutable((short)111); assertTrue(fs.mkdirs(dir, dirPerm)); assertTrue(fs.mkdirs(nestedDir, nestedDirPerm)); assertEquals(dirPerm, fs.getFileStatus(dir).getPermission()); assertEquals(nestedDirPerm, fs.getFileStatus(nestedDir).getPermission()); + + assertEquals(getClientFsUser(), fs.getFileStatus(dir).getOwner()); + assertEquals(getClientFsUser(), fs.getFileStatus(nestedDir).getOwner()); } /** @throws Exception If failed. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemClientSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemClientSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemClientSelfTest.java index b92b213..fcfd587 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemClientSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemClientSelfTest.java @@ -125,7 +125,7 @@ public class IgniteHadoopFileSystemClientSelfTest extends IgfsCommonAbstractTest try { switchHandlerErrorFlag(true); - HadoopIgfs client = new HadoopIgfsOutProc("127.0.0.1", 10500, getTestGridName(0), "igfs", LOG); + HadoopIgfs client = new HadoopIgfsOutProc("127.0.0.1", 10500, getTestGridName(0), "igfs", LOG, null); client.handshake(null); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemIpcCacheSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemIpcCacheSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemIpcCacheSelfTest.java index e103c5f..2c17ba9 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemIpcCacheSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemIpcCacheSelfTest.java @@ -144,6 +144,8 @@ public class IgniteHadoopFileSystemIpcCacheSelfTest extends IgfsCommonAbstractTe Map<String, HadoopIgfsIpcIo> cache = (Map<String, HadoopIgfsIpcIo>)cacheField.get(null); + cache.clear(); // avoid influence of previous tests in the same process. + String name = "igfs:" + getTestGridName(0) + "@"; Configuration cfg = new Configuration(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopFileSystemsTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopFileSystemsTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopFileSystemsTest.java index 8cf31a2..5f90bd4 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopFileSystemsTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopFileSystemsTest.java @@ -33,6 +33,7 @@ import java.util.concurrent.atomic.*; * Test file systems for the working directory multi-threading support. */ public class HadoopFileSystemsTest extends HadoopAbstractSelfTest { + /** the number of threads */ private static final int THREAD_COUNT = 3; /** {@inheritDoc} */ @@ -87,10 +88,6 @@ public class HadoopFileSystemsTest extends HadoopAbstractSelfTest { try { int curThreadNum = threadNum.getAndIncrement(); - FileSystem fs = FileSystem.get(uri, cfg); - - HadoopFileSystemsUtils.setUser(fs, "user" + curThreadNum); - if ("file".equals(uri.getScheme())) FileSystem.get(uri, cfg).setWorkingDirectory(new Path("file:///user/user" + curThreadNum)); @@ -149,24 +146,6 @@ public class HadoopFileSystemsTest extends HadoopAbstractSelfTest { } /** - * Test IGFS multi-thread working directory. - * - * @throws Exception If fails. - */ - public void testIgfs() throws Exception { - testFileSystem(URI.create(igfsScheme())); - } - - /** - * Test HDFS multi-thread working directory. - * - * @throws Exception If fails. - */ - public void testHdfs() throws Exception { - testFileSystem(URI.create("hdfs://localhost/")); - } - - /** * Test LocalFS multi-thread working directory. * * @throws Exception If fails. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/35388195/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipListSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipListSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipListSelfTest.java index 8a046e0..89bf830 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipListSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipListSelfTest.java @@ -61,10 +61,10 @@ public class HadoopSkipListSelfTest extends HadoopAbstractMapTest { int sigma = max((int)ceil(precission * exp), 5); - X.println("Level: " + level + " exp: " + exp + " act: " + levelsCnts[level] + " precission: " + precission + + X.println("Level: " + level + " exp: " + exp + " act: " + levelsCnts[level] + " precision: " + precission + " sigma: " + sigma); - assertTrue(abs(exp - levelsCnts[level]) <= sigma); + assertTrue(abs(exp - levelsCnts[level]) <= sigma); // Sometimes fails. } }