[IGNITE-218]: Wrong staging permissions while running MR job under hadoop accelerator
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/c9f72917 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/c9f72917 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/c9f72917 Branch: refs/heads/ignite-389-ipc Commit: c9f72917843092d596044197cf7cb05c56a13fca Parents: 20e5677 Author: iveselovskiy <iveselovs...@gridgain.com> Authored: Thu Jun 4 18:20:24 2015 +0300 Committer: iveselovskiy <iveselovs...@gridgain.com> Committed: Thu Jun 4 18:20:24 2015 +0300 ---------------------------------------------------------------------- .../processors/hadoop/HadoopTaskContext.java | 14 +- .../igfs/IgfsSecondaryFileSystemImpl.java | 2 +- .../fs/IgniteHadoopFileSystemCounterWriter.java | 14 +- .../hadoop/fs/v1/IgniteHadoopFileSystem.java | 70 ++--- .../hadoop/fs/v2/IgniteHadoopFileSystem.java | 2 +- .../processors/hadoop/HadoopDefaultJobInfo.java | 2 +- .../internal/processors/hadoop/HadoopUtils.java | 282 ++++++++++++++++++- .../hadoop/SecondaryFileSystemProvider.java | 4 +- .../hadoop/taskexecutor/HadoopRunnableTask.java | 20 +- .../processors/hadoop/v2/HadoopV2Job.java | 31 +- .../hadoop/v2/HadoopV2JobResourceManager.java | 26 +- .../hadoop/v2/HadoopV2TaskContext.java | 48 +++- .../hadoop/HadoopClientProtocolSelfTest.java | 6 +- .../hadoop/HadoopAbstractSelfTest.java | 14 +- .../hadoop/HadoopCommandLineTest.java | 14 +- .../processors/hadoop/HadoopMapReduceTest.java | 176 +++++++++++- .../hadoop/HadoopTaskExecutionSelfTest.java | 2 +- .../hadoop/HadoopTasksAllVersionsTest.java | 15 +- .../processors/hadoop/HadoopTasksV1Test.java | 5 +- .../processors/hadoop/HadoopTasksV2Test.java | 5 +- .../processors/hadoop/HadoopV2JobSelfTest.java | 6 +- .../collections/HadoopAbstractMapTest.java | 12 + 22 files changed, 643 insertions(+), 127 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java index 371fd81..3d2ee17 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java @@ -21,13 +21,14 @@ import org.apache.ignite.*; import org.apache.ignite.internal.processors.hadoop.counter.*; import java.util.*; +import java.util.concurrent.*; /** * Task context. */ public abstract class HadoopTaskContext { /** */ - private final HadoopJob job; + protected final HadoopJob job; /** */ private HadoopTaskInput input; @@ -187,4 +188,15 @@ public abstract class HadoopTaskContext { * @throws IgniteCheckedException If failed. */ public abstract void cleanupTaskEnvironment() throws IgniteCheckedException; + + /** + * Executes a callable on behalf of the job owner. + * In case of embedded task execution the implementation of this method + * will use classes loaded by the ClassLoader this HadoopTaskContext loaded with. + * @param c The callable. + * @param <T> The return type of the Callable. + * @return The result of the callable. + * @throws IgniteCheckedException On any error in callable. + */ + public abstract <T> T runAsJobOwner(Callable<T> c) throws IgniteCheckedException; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java index b8095b8..44ee90f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java @@ -121,6 +121,6 @@ class IgfsSecondaryFileSystemImpl implements IgfsSecondaryFileSystem { /** {@inheritDoc} */ @Override public void close() throws IgniteException { - igfs.stop(true); + // No-op. } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java index 66e9761..d910507 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java @@ -20,10 +20,12 @@ package org.apache.ignite.hadoop.fs; import org.apache.hadoop.conf.*; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.*; +import org.apache.hadoop.mapreduce.*; import org.apache.ignite.*; import org.apache.ignite.internal.processors.hadoop.*; import org.apache.ignite.internal.processors.hadoop.counter.*; import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters; +import org.apache.ignite.internal.processors.igfs.*; import org.apache.ignite.internal.util.typedef.*; import java.io.*; @@ -37,9 +39,6 @@ public class IgniteHadoopFileSystemCounterWriter implements HadoopCounterWriter public static final String PERFORMANCE_COUNTER_FILE_NAME = "performance"; /** */ - private static final String DEFAULT_USER_NAME = "anonymous"; - - /** */ public static final String COUNTER_WRITER_DIR_PROPERTY = "ignite.counters.fswriter.directory"; /** */ @@ -52,15 +51,14 @@ public class IgniteHadoopFileSystemCounterWriter implements HadoopCounterWriter @Override public void write(HadoopJobInfo jobInfo, HadoopJobId jobId, HadoopCounters cntrs) throws IgniteCheckedException { - Configuration hadoopCfg = new Configuration(); + Configuration hadoopCfg = HadoopUtils.safeCreateConfiguration(); for (Map.Entry<String, String> e : ((HadoopDefaultJobInfo)jobInfo).properties().entrySet()) hadoopCfg.set(e.getKey(), e.getValue()); String user = jobInfo.user(); - if (F.isEmpty(user)) - user = DEFAULT_USER_NAME; + user = IgfsUtils.fixUserName(user); String dir = jobInfo.property(COUNTER_WRITER_DIR_PROPERTY); @@ -72,7 +70,9 @@ public class IgniteHadoopFileSystemCounterWriter implements HadoopCounterWriter HadoopPerformanceCounter perfCntr = HadoopPerformanceCounter.getCounter(cntrs, null); try { - FileSystem fs = jobStatPath.getFileSystem(hadoopCfg); + hadoopCfg.set(MRJobConfig.USER_NAME, user); + + FileSystem fs = HadoopUtils.fileSystemForMrUser(jobStatPath.toUri(), hadoopCfg, true); fs.mkdirs(jobStatPath); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java index c0a9ade..9d94e5b 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java @@ -22,7 +22,6 @@ import org.apache.hadoop.fs.*; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.permission.*; import org.apache.hadoop.hdfs.*; -import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.security.*; import org.apache.hadoop.util.*; import org.apache.ignite.*; @@ -144,9 +143,6 @@ public class IgniteHadoopFileSystem extends FileSystem { /** Custom-provided sequential reads before prefetch. */ private int seqReadsBeforePrefetch; - /** The cache was disabled when the instance was creating. */ - private boolean cacheEnabled; - /** {@inheritDoc} */ @Override public URI getUri() { if (uri == null) @@ -173,27 +169,13 @@ public class IgniteHadoopFileSystem extends FileSystem { } /** - * Gets non-null and interned user name as per the Hadoop file system viewpoint. + * Gets non-null user name as per the Hadoop file system viewpoint. * @return the user name, never null. */ - public static String getFsHadoopUser(Configuration cfg) throws IOException { - String user = null; - - // ------------------------------------------- - // TODO: Temporary workaround, see https://issues.apache.org/jira/browse/IGNITE-761 - // We have an issue there: sometimes FileSystem created from MR jobs gets incorrect - // UserGroupInformation.getCurrentUser() despite of the fact that it is invoked in correct - // ugi.doAs() closure. - if (cfg != null) - user = cfg.get(MRJobConfig.USER_NAME); - // ------------------------------------------- - - if (user == null) { - UserGroupInformation currUgi = UserGroupInformation.getCurrentUser(); - - if (currUgi != null) - user = currUgi.getShortUserName(); - } + public static String getFsHadoopUser() throws IOException { + UserGroupInformation currUgi = UserGroupInformation.getCurrentUser(); + + String user = currUgi.getShortUserName(); user = IgfsUtils.fixUserName(user); @@ -228,10 +210,6 @@ public class IgniteHadoopFileSystem extends FileSystem { setConf(cfg); - String disableCacheName = String.format("fs.%s.impl.disable.cache", name.getScheme()); - - cacheEnabled = !cfg.getBoolean(disableCacheName, false); - mgmt = cfg.getBoolean(IGFS_MANAGEMENT, false); if (!IGFS_SCHEME.equals(name.getScheme())) @@ -242,7 +220,7 @@ public class IgniteHadoopFileSystem extends FileSystem { uriAuthority = uri.getAuthority(); - user = getFsHadoopUser(cfg); + user = getFsHadoopUser(); // Override sequential reads before prefetch if needed. seqReadsBeforePrefetch = parameter(cfg, PARAM_IGFS_SEQ_READS_BEFORE_PREFETCH, uriAuthority, 0); @@ -360,15 +338,13 @@ public class IgniteHadoopFileSystem extends FileSystem { @Override protected void finalize() throws Throwable { super.finalize(); - close0(); + close(); } /** {@inheritDoc} */ @Override public void close() throws IOException { - if (cacheEnabled && get(getUri(), getConf()) == this) - return; - - close0(); + if (closeGuard.compareAndSet(false, true)) + close0(); } /** @@ -377,27 +353,25 @@ public class IgniteHadoopFileSystem extends FileSystem { * @throws IOException If failed. */ private void close0() throws IOException { - if (closeGuard.compareAndSet(false, true)) { - if (LOG.isDebugEnabled()) - LOG.debug("File system closed [uri=" + uri + ", endpoint=" + uriAuthority + ']'); + if (LOG.isDebugEnabled()) + LOG.debug("File system closed [uri=" + uri + ", endpoint=" + uriAuthority + ']'); - if (rmtClient == null) - return; + if (rmtClient == null) + return; - super.close(); + super.close(); - rmtClient.close(false); + rmtClient.close(false); - if (clientLog.isLogEnabled()) - clientLog.close(); + if (clientLog.isLogEnabled()) + clientLog.close(); - if (secondaryFs != null) - U.closeQuiet(secondaryFs); + if (secondaryFs != null) + U.closeQuiet(secondaryFs); - // Reset initialized resources. - uri = null; - rmtClient = null; - } + // Reset initialized resources. + uri = null; + rmtClient = null; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java index f3fbe9c..8330143 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java @@ -144,7 +144,7 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea uri = name; - user = getFsHadoopUser(cfg); + user = getFsHadoopUser(); try { initialize(name, cfg); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java index d0a327e..2e855d0 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java @@ -89,7 +89,7 @@ public class HadoopDefaultJobInfo implements HadoopJobInfo, Externalizable { if (jobCls0 == null) { // It is enough to have only one class loader with only Hadoop classes. synchronized (HadoopDefaultJobInfo.class) { if ((jobCls0 = jobCls) == null) { - HadoopClassLoader ldr = new HadoopClassLoader(null, "hadoop-main"); + HadoopClassLoader ldr = new HadoopClassLoader(null, "hadoop-job"); jobCls = jobCls0 = ldr.loadClass(HadoopV2Job.class.getName()); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java index d493bd4..68a9ef6 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java @@ -26,10 +26,16 @@ import org.apache.hadoop.mapreduce.JobPriority; import org.apache.hadoop.mapreduce.JobStatus; import org.apache.hadoop.mapreduce.*; import org.apache.ignite.*; +import org.apache.ignite.hadoop.fs.v1.*; +import org.apache.ignite.internal.processors.hadoop.fs.*; import org.apache.ignite.internal.processors.hadoop.v2.*; +import org.apache.ignite.internal.util.*; +import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; +import org.jetbrains.annotations.*; import java.io.*; +import java.net.*; import java.util.*; /** @@ -57,6 +63,41 @@ public class HadoopUtils { /** Old reducer class attribute. */ private static final String OLD_REDUCE_CLASS_ATTR = "mapred.reducer.class"; + /** Lazy per-user cache for the file systems. It is cleared and nulled in #close() method. */ + private static final HadoopLazyConcurrentMap<FsCacheKey, FileSystem> fileSysLazyMap = new HadoopLazyConcurrentMap<>( + new HadoopLazyConcurrentMap.ValueFactory<FsCacheKey, FileSystem>() { + @Override public FileSystem createValue(FsCacheKey key) { + try { + assert key != null; + + // Explicitly disable FileSystem caching: + URI uri = key.uri(); + + String scheme = uri.getScheme(); + + // Copy the configuration to avoid altering the external object. + Configuration cfg = new Configuration(key.configuration()); + + String prop = HadoopUtils.disableFsCachePropertyName(scheme); + + cfg.setBoolean(prop, true); + + return FileSystem.get(uri, cfg, key.user()); + } + catch (IOException | InterruptedException ioe) { + throw new IgniteException(ioe); + } + } + } + ); + + /** + * Constructor. + */ + private HadoopUtils() { + // No-op. + } + /** * Wraps native split. * @@ -126,8 +167,6 @@ public class HadoopUtils { break; case PHASE_REDUCE: - // TODO: temporary fixed, but why PHASE_REDUCE could have 0 reducers? - // See https://issues.apache.org/jira/browse/IGNITE-764 setupProgress = 1; mapProgress = 1; @@ -304,9 +343,242 @@ public class HadoopUtils { } /** - * Constructor. + * Creates {@link Configuration} in a correct class loader context to avoid caching + * of inappropriate class loader in the Configuration object. + * @return New instance of {@link Configuration}. */ - private HadoopUtils() { - // No-op. + public static Configuration safeCreateConfiguration() { + final ClassLoader cl0 = Thread.currentThread().getContextClassLoader(); + + Thread.currentThread().setContextClassLoader(Configuration.class.getClassLoader()); + + try { + return new Configuration(); + } + finally { + Thread.currentThread().setContextClassLoader(cl0); + } + } + + /** + * Creates {@link JobConf} in a correct class loader context to avoid caching + * of inappropriate class loader in the Configuration object. + * @return New instance of {@link JobConf}. + */ + public static JobConf safeCreateJobConf() { + final ClassLoader cl0 = Thread.currentThread().getContextClassLoader(); + + Thread.currentThread().setContextClassLoader(JobConf.class.getClassLoader()); + + try { + return new JobConf(); + } + finally { + Thread.currentThread().setContextClassLoader(cl0); + } + } + + /** + * Gets non-null user name as per the Hadoop viewpoint. + * @param cfg the Hadoop job configuration, may be null. + * @return the user name, never null. + */ + private static String getMrHadoopUser(Configuration cfg) throws IOException { + String user = cfg.get(MRJobConfig.USER_NAME); + + if (user == null) + user = IgniteHadoopFileSystem.getFsHadoopUser(); + + return user; + } + + /** + * Common method to get the V1 file system in MapRed engine. + * It creates the filesystem for the user specified in the + * configuration with {@link MRJobConfig#USER_NAME} property. + * @param uri the file system uri. + * @param cfg the configuration. + * @return the file system + * @throws IOException + */ + public static FileSystem fileSystemForMrUser(@Nullable URI uri, Configuration cfg, boolean doCacheFs) throws IOException { + final String usr = getMrHadoopUser(cfg); + + assert usr != null; + + if (uri == null) + uri = FileSystem.getDefaultUri(cfg); + + final FileSystem fs; + + if (doCacheFs) { + try { + fs = getWithCaching(uri, cfg, usr); + } + catch (IgniteException ie) { + throw new IOException(ie); + } + } + else { + try { + fs = FileSystem.get(uri, cfg, usr); + } + catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + + throw new IOException(ie); + } + } + + assert fs != null; + assert !(fs instanceof IgniteHadoopFileSystem) || F.eq(usr, ((IgniteHadoopFileSystem)fs).user()); + + return fs; + } + + /** + * Note that configuration is not a part of the key. + * It is used solely to initialize the first instance + * that is created for the key. + */ + public static final class FsCacheKey { + /** */ + private final URI uri; + + /** */ + private final String usr; + + /** */ + private final String equalityKey; + + /** */ + private final Configuration cfg; + + /** + * Constructor + */ + public FsCacheKey(URI uri, String usr, Configuration cfg) { + assert uri != null; + assert usr != null; + assert cfg != null; + + this.uri = fixUri(uri, cfg); + this.usr = usr; + this.cfg = cfg; + + this.equalityKey = createEqualityKey(); + } + + /** + * Creates String key used for equality and hashing. + */ + private String createEqualityKey() { + GridStringBuilder sb = new GridStringBuilder("(").a(usr).a(")@"); + + if (uri.getScheme() != null) + sb.a(uri.getScheme().toLowerCase()); + + sb.a("://"); + + if (uri.getAuthority() != null) + sb.a(uri.getAuthority().toLowerCase()); + + return sb.toString(); + } + + /** + * The URI. + */ + public URI uri() { + return uri; + } + + /** + * The User. + */ + public String user() { + return usr; + } + + /** + * The Configuration. + */ + public Configuration configuration() { + return cfg; + } + + /** {@inheritDoc} */ + @SuppressWarnings("SimplifiableIfStatement") + @Override public boolean equals(Object obj) { + if (obj == this) + return true; + + if (obj == null || getClass() != obj.getClass()) + return false; + + return equalityKey.equals(((FsCacheKey)obj).equalityKey); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return equalityKey.hashCode(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return equalityKey; + } + } + + /** + * Gets FileSystem caching it in static Ignite cache. The cache is a singleton + * for each class loader. + * + * <p/>Note that the file systems in the cache are keyed by a triplet {scheme, authority, user}. + * The Configuration is not a part of the key. This means that for the given key file system is + * initialized only once with the Configuration passed in upon the file system creation. + * + * @param uri The file system URI. + * @param cfg The configuration. + * @param usr The user to create file system for. + * @return The file system: either created, or taken from the cache. + */ + private static FileSystem getWithCaching(URI uri, Configuration cfg, String usr) { + FsCacheKey key = new FsCacheKey(uri, usr, cfg); + + return fileSysLazyMap.getOrCreate(key); + } + + /** + * Gets the property name to disable file system cache. + * @param scheme The file system URI scheme. + * @return The property name. If scheme is null, + * returns "fs.null.impl.disable.cache". + */ + public static String disableFsCachePropertyName(@Nullable String scheme) { + return String.format("fs.%s.impl.disable.cache", scheme); + } + + /** + * Takes Fs URI using logic similar to that used in FileSystem#get(1,2,3). + * @param uri0 The uri. + * @param cfg The cfg. + * @return Correct URI. + */ + public static URI fixUri(URI uri0, Configuration cfg) { + if (uri0 == null) + return FileSystem.getDefaultUri(cfg); + + String scheme = uri0.getScheme(); + String authority = uri0.getAuthority(); + + if (authority == null) { + URI dfltUri = FileSystem.getDefaultUri(cfg); + + if (scheme == null || (scheme.equals(dfltUri.getScheme()) && dfltUri.getAuthority() != null)) + return dfltUri; + } + + return uri0; } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java index b1a057c..dd679de 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java @@ -34,7 +34,7 @@ import java.security.*; */ public class SecondaryFileSystemProvider { /** Configuration of the secondary filesystem, never null. */ - private final Configuration cfg = new Configuration(); + private final Configuration cfg = HadoopUtils.safeCreateConfiguration(); /** The secondary filesystem URI, never null. */ private final URI uri; @@ -76,7 +76,7 @@ public class SecondaryFileSystemProvider { } // Disable caching: - String prop = String.format("fs.%s.impl.disable.cache", uri.getScheme()); + String prop = HadoopUtils.disableFsCachePropertyName(uri.getScheme()); cfg.setBoolean(prop, true); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java index 2e04ac1..b170125 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java @@ -99,6 +99,22 @@ public abstract class HadoopRunnableTask implements Callable<Void> { /** {@inheritDoc} */ @Override public Void call() throws IgniteCheckedException { + ctx = job.getTaskContext(info); + + return ctx.runAsJobOwner(new Callable<Void>() { + @Override public Void call() throws Exception { + call0(); + + return null; + } + }); + } + + /** + * Implements actual task running. + * @throws IgniteCheckedException + */ + void call0() throws IgniteCheckedException { execStartTs = U.currentTimeMillis(); Throwable err = null; @@ -108,8 +124,6 @@ public abstract class HadoopRunnableTask implements Callable<Void> { HadoopPerformanceCounter perfCntr = null; try { - ctx = job.getTaskContext(info); - perfCntr = HadoopPerformanceCounter.getCounter(ctx.counters(), nodeId); perfCntr.onTaskSubmit(info, submitTs); @@ -156,8 +170,6 @@ public abstract class HadoopRunnableTask implements Callable<Void> { if (ctx != null) ctx.cleanupTaskEnvironment(); } - - return null; } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java index d265ca8..d754039 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java @@ -18,7 +18,6 @@ package org.apache.ignite.internal.processors.hadoop.v2; import org.apache.hadoop.fs.*; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.io.*; import org.apache.hadoop.mapred.*; import org.apache.hadoop.mapred.JobID; @@ -68,7 +67,7 @@ public class HadoopV2Job implements HadoopJob { new ConcurrentHashMap8<>(); /** Pooling task context class and thus class loading environment. */ - private final Queue<Class<?>> taskCtxClsPool = new ConcurrentLinkedQueue<>(); + private final Queue<Class<? extends HadoopTaskContext>> taskCtxClsPool = new ConcurrentLinkedQueue<>(); /** All created contexts. */ private final Queue<Class<?>> fullCtxClsQueue = new ConcurrentLinkedDeque<>(); @@ -93,12 +92,7 @@ public class HadoopV2Job implements HadoopJob { hadoopJobID = new JobID(jobId.globalId().toString(), jobId.localId()); - HadoopClassLoader clsLdr = (HadoopClassLoader)getClass().getClassLoader(); - - // Before create JobConf instance we should set new context class loader. - Thread.currentThread().setContextClassLoader(clsLdr); - - jobConf = new JobConf(); + jobConf = HadoopUtils.safeCreateJobConf(); HadoopFileSystemsUtils.setupFileSystems(jobConf); @@ -139,7 +133,9 @@ public class HadoopV2Job implements HadoopJob { Path jobDir = new Path(jobDirPath); - try (FileSystem fs = FileSystem.get(jobDir.toUri(), jobConf)) { + try { + FileSystem fs = fileSystemForMrUser(jobDir.toUri(), jobConf, true); + JobSplit.TaskSplitMetaInfo[] metaInfos = SplitMetaInfoReader.readSplitMetaInfo(hadoopJobID, fs, jobConf, jobDir); @@ -197,7 +193,7 @@ public class HadoopV2Job implements HadoopJob { if (old != null) return old.get(); - Class<?> cls = taskCtxClsPool.poll(); + Class<? extends HadoopTaskContext> cls = taskCtxClsPool.poll(); try { if (cls == null) { @@ -205,9 +201,9 @@ public class HadoopV2Job implements HadoopJob { // Note that the classloader identified by the task it was initially created for, // but later it may be reused for other tasks. HadoopClassLoader ldr = new HadoopClassLoader(rsrcMgr.classPath(), - "hadoop-" + info.jobId() + "-" + info.type() + "-" + info.taskNumber()); + "hadoop-task-" + info.jobId() + "-" + info.type() + "-" + info.taskNumber()); - cls = ldr.loadClass(HadoopV2TaskContext.class.getName()); + cls = (Class<? extends HadoopTaskContext>)ldr.loadClass(HadoopV2TaskContext.class.getName()); fullCtxClsQueue.add(cls); } @@ -325,7 +321,14 @@ public class HadoopV2Job implements HadoopJob { /** {@inheritDoc} */ @Override public void cleanupStagingDirectory() { - if (rsrcMgr != null) - rsrcMgr.cleanupStagingDirectory(); + rsrcMgr.cleanupStagingDirectory(); + } + + /** + * Getter for job configuration. + * @return The job configuration. + */ + public JobConf jobConf() { + return jobConf; } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java index 6f6bfa1..2f64e77 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java @@ -40,6 +40,9 @@ import java.util.*; * files are needed to be placed on local files system. */ public class HadoopV2JobResourceManager { + /** File type Fs disable caching property name. */ + private static final String FILE_DISABLE_CACHING_PROPERTY_NAME = HadoopUtils.disableFsCachePropertyName("file"); + /** Hadoop job context. */ private final JobContextImpl ctx; @@ -84,7 +87,7 @@ public class HadoopV2JobResourceManager { try { cfg.set(HadoopFileSystemsUtils.LOC_FS_WORK_DIR_PROP, dir.getAbsolutePath()); - if(!cfg.getBoolean("fs.file.impl.disable.cache", false)) + if (!cfg.getBoolean(FILE_DISABLE_CACHING_PROPERTY_NAME, false)) FileSystem.getLocal(cfg).setWorkingDirectory(new Path(dir.getAbsolutePath())); } finally { @@ -112,15 +115,17 @@ public class HadoopV2JobResourceManager { stagingDir = new Path(new URI(mrDir)); if (download) { - FileSystem fs = FileSystem.get(stagingDir.toUri(), cfg); + FileSystem fs = HadoopUtils.fileSystemForMrUser(stagingDir.toUri(), cfg, true); if (!fs.exists(stagingDir)) - throw new IgniteCheckedException("Failed to find map-reduce submission directory (does not exist): " + - stagingDir); + throw new IgniteCheckedException("Failed to find map-reduce submission " + + "directory (does not exist): " + stagingDir); if (!FileUtil.copy(fs, stagingDir, jobLocDir, false, cfg)) - throw new IgniteCheckedException("Failed to copy job submission directory contents to local file system " + - "[path=" + stagingDir + ", locDir=" + jobLocDir.getAbsolutePath() + ", jobId=" + jobId + ']'); + throw new IgniteCheckedException("Failed to copy job submission directory " + + "contents to local file system " + + "[path=" + stagingDir + ", locDir=" + jobLocDir.getAbsolutePath() + + ", jobId=" + jobId + ']'); } File jarJobFile = new File(jobLocDir, "job.jar"); @@ -144,7 +149,8 @@ public class HadoopV2JobResourceManager { } } else if (!jobLocDir.mkdirs()) - throw new IgniteCheckedException("Failed to create local job directory: " + jobLocDir.getAbsolutePath()); + throw new IgniteCheckedException("Failed to create local job directory: " + + jobLocDir.getAbsolutePath()); setLocalFSWorkingDirectory(jobLocDir); } @@ -204,14 +210,14 @@ public class HadoopV2JobResourceManager { FileSystem dstFs = FileSystem.getLocal(cfg); - FileSystem srcFs = srcPath.getFileSystem(cfg); + FileSystem srcFs = HadoopUtils.fileSystemForMrUser(srcPath.toUri(), cfg, true); if (extract) { File archivesPath = new File(jobLocDir.getAbsolutePath(), ".cached-archives"); if (!archivesPath.exists() && !archivesPath.mkdir()) throw new IOException("Failed to create directory " + - "[path=" + archivesPath + ", jobId=" + jobId + ']'); + "[path=" + archivesPath + ", jobId=" + jobId + ']'); File archiveFile = new File(archivesPath, locName); @@ -287,7 +293,7 @@ public class HadoopV2JobResourceManager { public void cleanupStagingDirectory() { try { if (stagingDir != null) - stagingDir.getFileSystem(ctx.getJobConf()).delete(stagingDir, true); + HadoopUtils.fileSystemForMrUser(stagingDir.toUri(), ctx.getJobConf(), true).delete(stagingDir, true); } catch (Exception e) { log.error("Failed to remove job staging directory [path=" + stagingDir + ", jobId=" + jobId + ']' , e); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java index dd18c66..e89feba 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java @@ -28,17 +28,21 @@ import org.apache.hadoop.mapred.TaskAttemptID; import org.apache.hadoop.mapred.TaskID; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.*; +import org.apache.hadoop.security.*; import org.apache.ignite.*; import org.apache.ignite.internal.processors.hadoop.*; import org.apache.ignite.internal.processors.hadoop.counter.*; import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters; -import org.apache.ignite.internal.processors.hadoop.fs.*; import org.apache.ignite.internal.processors.hadoop.v1.*; +import org.apache.ignite.internal.processors.igfs.*; +import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.jetbrains.annotations.*; import java.io.*; +import java.security.*; import java.util.*; +import java.util.concurrent.*; import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.*; import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*; @@ -419,7 +423,7 @@ public class HadoopV2TaskContext extends HadoopTaskContext { private Object readExternalSplit(HadoopExternalSplit split) throws IgniteCheckedException { Path jobDir = new Path(jobConf().get(MRJobConfig.MAPREDUCE_JOB_DIR)); - try (FileSystem fs = FileSystem.get(jobDir.toUri(), jobConf()); + try (FileSystem fs = fileSystemForMrUser(jobDir.toUri(), jobConf(), false); FSDataInputStream in = fs.open(JobSubmissionFiles.getJobSplitFile(jobDir))) { in.seek(split.offset()); @@ -448,4 +452,44 @@ public class HadoopV2TaskContext extends HadoopTaskContext { throw new IgniteCheckedException(e); } } + + /** {@inheritDoc} */ + @Override public <T> T runAsJobOwner(final Callable<T> c) throws IgniteCheckedException { + String user = job.info().user(); + + user = IgfsUtils.fixUserName(user); + + assert user != null; + + String ugiUser; + + try { + UserGroupInformation currUser = UserGroupInformation.getCurrentUser(); + + assert currUser != null; + + ugiUser = currUser.getShortUserName(); + } + catch (IOException ioe) { + throw new IgniteCheckedException(ioe); + } + + try { + if (F.eq(user, ugiUser)) + // if current UGI context user is the same, do direct call: + return c.call(); + else { + UserGroupInformation ugi = UserGroupInformation.getBestUGI(null, user); + + return ugi.doAs(new PrivilegedExceptionAction<T>() { + @Override public T run() throws Exception { + return c.call(); + } + }); + } + } + catch (Exception e) { + throw new IgniteCheckedException(e); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolSelfTest.java index b94d9d1..b9f8179 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolSelfTest.java @@ -28,7 +28,6 @@ import org.apache.ignite.*; import org.apache.ignite.hadoop.mapreduce.*; import org.apache.ignite.igfs.*; import org.apache.ignite.internal.processors.hadoop.*; -import org.apache.ignite.internal.processors.hadoop.proto.*; import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; @@ -449,7 +448,7 @@ public class HadoopClientProtocolSelfTest extends HadoopAbstractSelfTest { * @return Configuration. */ private Configuration config(int port) { - Configuration conf = new Configuration(); + Configuration conf = HadoopUtils.safeCreateConfiguration(); setupFileSystems(conf); @@ -521,9 +520,8 @@ public class HadoopClientProtocolSelfTest extends HadoopAbstractSelfTest { ctx.getCounter(TestCounter.COUNTER2).increment(1); int sum = 0; - for (IntWritable value : values) { + for (IntWritable value : values) sum += value.get(); - } ctx.write(key, new IntWritable(sum)); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java index af1a1e1..e8a0a6f 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java @@ -22,7 +22,6 @@ import org.apache.ignite.configuration.*; import org.apache.ignite.igfs.*; import org.apache.ignite.hadoop.fs.v2.IgniteHadoopFileSystem; import org.apache.ignite.internal.processors.hadoop.fs.*; -import org.apache.ignite.spi.communication.tcp.*; import org.apache.ignite.spi.discovery.tcp.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; @@ -62,6 +61,17 @@ public abstract class HadoopAbstractSelfTest extends GridCommonAbstractTest { /** Initial REST port. */ private int restPort = REST_PORT; + /** Secondary file system REST endpoint configuration. */ + protected static final IgfsIpcEndpointConfiguration SECONDARY_REST_CFG; + + static { + SECONDARY_REST_CFG = new IgfsIpcEndpointConfiguration(); + + SECONDARY_REST_CFG.setType(IgfsIpcEndpointType.TCP); + SECONDARY_REST_CFG.setPort(11500); + } + + /** Initial classpath. */ private static String initCp; @@ -133,7 +143,7 @@ public abstract class HadoopAbstractSelfTest extends GridCommonAbstractTest { /** * @return IGFS configuration. */ - public FileSystemConfiguration igfsConfiguration() { + public FileSystemConfiguration igfsConfiguration() throws Exception { FileSystemConfiguration cfg = new FileSystemConfiguration(); cfg.setName(igfsName); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopCommandLineTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopCommandLineTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopCommandLineTest.java index d10ee5c..c66cdf3 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopCommandLineTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopCommandLineTest.java @@ -19,12 +19,16 @@ package org.apache.ignite.internal.processors.hadoop; import com.google.common.base.*; import org.apache.ignite.*; +import org.apache.ignite.configuration.*; import org.apache.ignite.hadoop.fs.*; import org.apache.ignite.igfs.*; +import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.igfs.*; import org.apache.ignite.internal.processors.hadoop.jobtracker.*; +import org.apache.ignite.internal.processors.resource.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; import org.apache.ignite.testframework.junits.common.*; import org.jsr166.*; @@ -205,7 +209,15 @@ public class HadoopCommandLineTest extends GridCommonAbstractTest { /** {@inheritDoc} */ @Override protected void beforeTest() throws Exception { - igfs = (IgfsEx) Ignition.start("config/hadoop/default-config.xml").fileSystem(igfsName); + String cfgPath = "config/hadoop/default-config.xml"; + + IgniteBiTuple<IgniteConfiguration, GridSpringResourceContext> tup = IgnitionEx.loadConfiguration(cfgPath); + + IgniteConfiguration cfg = tup.get1(); + + cfg.setLocalHost("127.0.0.1"); // Avoid connecting to other nodes. + + igfs = (IgfsEx) Ignition.start(cfg).fileSystem(igfsName); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java index 8a3a0ac..a1ef7ba 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java @@ -24,31 +24,104 @@ import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.configuration.*; import org.apache.ignite.hadoop.fs.*; import org.apache.ignite.igfs.*; +import org.apache.ignite.igfs.secondary.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.hadoop.counter.*; import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters; import org.apache.ignite.internal.processors.hadoop.examples.*; +import org.apache.ignite.internal.processors.igfs.*; import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; import org.apache.ignite.testframework.*; +import org.jetbrains.annotations.*; import java.io.*; import java.util.*; +import static org.apache.ignite.cache.CacheAtomicityMode.*; +import static org.apache.ignite.cache.CacheMode.*; +import static org.apache.ignite.igfs.IgfsMode.*; import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*; /** * Test of whole cycle of map-reduce processing via Job tracker. */ public class HadoopMapReduceTest extends HadoopAbstractWordCountTest { + /** IGFS block size. */ + protected static final int IGFS_BLOCK_SIZE = 512 * 1024; + + /** Amount of blocks to prefetch. */ + protected static final int PREFETCH_BLOCKS = 1; + + /** Amount of sequential block reads before prefetch is triggered. */ + protected static final int SEQ_READS_BEFORE_PREFETCH = 2; + + /** Secondary file system URI. */ + protected static final String SECONDARY_URI = "igfs://igfs-secondary:grid-secondary@127.0.0.1:11500/"; + + /** Secondary file system configuration path. */ + protected static final String SECONDARY_CFG = "modules/core/src/test/config/hadoop/core-site-loopback-secondary.xml"; + + /** The user to run Hadoop job on behalf of. */ + protected static final String USER = "vasya"; + + /** Secondary IGFS name. */ + protected static final String SECONDARY_IGFS_NAME = "igfs-secondary"; + + /** The secondary Ignite node. */ + protected Ignite igniteSecondary; + + /** The secondary Fs. */ + protected IgfsSecondaryFileSystem secondaryFs; + /** {@inheritDoc} */ @Override protected int gridCount() { return 3; } /** + * Gets owner of a IgfsEx path. + * @param p The path. + * @return The owner. + */ + private static String getOwner(IgfsEx i, IgfsPath p) { + return i.info(p).property(IgfsEx.PROP_USER_NAME); + } + + /** + * Gets owner of a secondary Fs path. + * @param secFs The sec Fs. + * @param p The path. + * @return The owner. + */ + private static String getOwnerSecondary(final IgfsSecondaryFileSystem secFs, final IgfsPath p) { + return IgfsUserContext.doAs(USER, new IgniteOutClosure<String>() { + @Override public String apply() { + return secFs.info(p).property(IgfsEx.PROP_USER_NAME); + } + }); + } + + /** + * Checks owner of the path. + * @param p The path. + */ + private void checkOwner(IgfsPath p) { + String ownerPrim = getOwner(igfs, p); + assertEquals(USER, ownerPrim); + + String ownerSec = getOwnerSecondary(secondaryFs, p); + assertEquals(USER, ownerSec); + } + + /** * Tests whole job execution with all phases in all combination of new and old versions of API. * @throws Exception If fails. */ @@ -71,7 +144,7 @@ public class HadoopMapReduceTest extends HadoopAbstractWordCountTest { JobConf jobConf = new JobConf(); jobConf.set(JOB_COUNTER_WRITER_PROPERTY, IgniteHadoopFileSystemCounterWriter.class.getName()); - jobConf.setUser("yyy"); + jobConf.setUser(USER); jobConf.set(IgniteHadoopFileSystemCounterWriter.COUNTER_WRITER_DIR_PROPERTY, "/xxx/${USER}/zzz"); //To split into about 40 items for v2 @@ -105,14 +178,20 @@ public class HadoopMapReduceTest extends HadoopAbstractWordCountTest { checkJobStatistics(jobId); + final String outFile = PATH_OUTPUT + "/" + (useNewReducer ? "part-r-" : "part-") + "00000"; + + checkOwner(new IgfsPath(PATH_OUTPUT + "/" + "_SUCCESS")); + + checkOwner(new IgfsPath(outFile)); + assertEquals("Use new mapper: " + useNewMapper + ", new combiner: " + useNewCombiner + ", new reducer: " + - useNewReducer, + useNewReducer, "blue\t200000\n" + - "green\t150000\n" + - "red\t100000\n" + - "yellow\t70000\n", - readAndSortFile(PATH_OUTPUT + "/" + (useNewReducer ? "part-r-" : "part-") + "00000") - ); + "green\t150000\n" + + "red\t100000\n" + + "yellow\t70000\n", + readAndSortFile(outFile) + ); } } @@ -182,7 +261,7 @@ public class HadoopMapReduceTest extends HadoopAbstractWordCountTest { } } - final IgfsPath statPath = new IgfsPath("/xxx/yyy/zzz/" + jobId + "/performance"); + final IgfsPath statPath = new IgfsPath("/xxx/" + USER + "/zzz/" + jobId + "/performance"); assert GridTestUtils.waitForCondition(new GridAbsPredicate() { @Override public boolean apply() { @@ -212,4 +291,85 @@ public class HadoopMapReduceTest extends HadoopAbstractWordCountTest { ", actual=" + HadoopTestUtils.simpleCheckJobStatFile(reader) + ']'; } } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + igniteSecondary = startGridWithIgfs("grid-secondary", SECONDARY_IGFS_NAME, PRIMARY, null, SECONDARY_REST_CFG); + + super.beforeTest(); + } + + /** + * Start grid with IGFS. + * + * @param gridName Grid name. + * @param igfsName IGFS name + * @param mode IGFS mode. + * @param secondaryFs Secondary file system (optional). + * @param restCfg Rest configuration string (optional). + * @return Started grid instance. + * @throws Exception If failed. + */ + protected Ignite startGridWithIgfs(String gridName, String igfsName, IgfsMode mode, + @Nullable IgfsSecondaryFileSystem secondaryFs, @Nullable IgfsIpcEndpointConfiguration restCfg) throws Exception { + FileSystemConfiguration igfsCfg = new FileSystemConfiguration(); + + igfsCfg.setDataCacheName("dataCache"); + igfsCfg.setMetaCacheName("metaCache"); + igfsCfg.setName(igfsName); + igfsCfg.setBlockSize(IGFS_BLOCK_SIZE); + igfsCfg.setDefaultMode(mode); + igfsCfg.setIpcEndpointConfiguration(restCfg); + igfsCfg.setSecondaryFileSystem(secondaryFs); + igfsCfg.setPrefetchBlocks(PREFETCH_BLOCKS); + igfsCfg.setSequentialReadsBeforePrefetch(SEQ_READS_BEFORE_PREFETCH); + + CacheConfiguration dataCacheCfg = defaultCacheConfiguration(); + + dataCacheCfg.setName("dataCache"); + dataCacheCfg.setCacheMode(PARTITIONED); + dataCacheCfg.setNearConfiguration(null); + dataCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + dataCacheCfg.setAffinityMapper(new IgfsGroupDataBlocksKeyMapper(2)); + dataCacheCfg.setBackups(0); + dataCacheCfg.setAtomicityMode(TRANSACTIONAL); + dataCacheCfg.setOffHeapMaxMemory(0); + + CacheConfiguration metaCacheCfg = defaultCacheConfiguration(); + + metaCacheCfg.setName("metaCache"); + metaCacheCfg.setCacheMode(REPLICATED); + metaCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + metaCacheCfg.setAtomicityMode(TRANSACTIONAL); + + IgniteConfiguration cfg = new IgniteConfiguration(); + + cfg.setGridName(gridName); + + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + + discoSpi.setIpFinder(new TcpDiscoveryVmIpFinder(true)); + + cfg.setDiscoverySpi(discoSpi); + cfg.setCacheConfiguration(dataCacheCfg, metaCacheCfg); + cfg.setFileSystemConfiguration(igfsCfg); + + cfg.setLocalHost("127.0.0.1"); + cfg.setConnectorConfiguration(null); + + return G.start(cfg); + } + + /** + * @return IGFS configuration. + */ + @Override public FileSystemConfiguration igfsConfiguration() throws Exception { + FileSystemConfiguration fsCfg = super.igfsConfiguration(); + + secondaryFs = new IgniteHadoopIgfsSecondaryFileSystem(SECONDARY_URI, SECONDARY_CFG); + + fsCfg.setSecondaryFileSystem(secondaryFs); + + return fsCfg; + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskExecutionSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskExecutionSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskExecutionSelfTest.java index 8dc9830..eee5c8b 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskExecutionSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskExecutionSelfTest.java @@ -72,7 +72,7 @@ public class HadoopTaskExecutionSelfTest extends HadoopAbstractSelfTest { /** {@inheritDoc} */ - @Override public FileSystemConfiguration igfsConfiguration() { + @Override public FileSystemConfiguration igfsConfiguration() throws Exception { FileSystemConfiguration cfg = super.igfsConfiguration(); cfg.setFragmentizerEnabled(false); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksAllVersionsTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksAllVersionsTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksAllVersionsTest.java index aaf0f92..6930020 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksAllVersionsTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksAllVersionsTest.java @@ -22,7 +22,6 @@ import org.apache.hadoop.io.*; import org.apache.ignite.*; import org.apache.ignite.igfs.*; import org.apache.ignite.internal.processors.hadoop.examples.*; -import org.apache.ignite.internal.processors.hadoop.v2.*; import java.io.*; import java.net.*; @@ -43,7 +42,7 @@ abstract class HadoopTasksAllVersionsTest extends HadoopAbstractWordCountTest { * @return Hadoop job. * @throws IOException If fails. */ - public abstract HadoopV2Job getHadoopJob(String inFile, String outFile) throws Exception; + public abstract HadoopJob getHadoopJob(String inFile, String outFile) throws Exception; /** * @return prefix of reducer output file name. It's "part-" for v1 and "part-r-" for v2 API @@ -79,7 +78,7 @@ abstract class HadoopTasksAllVersionsTest extends HadoopAbstractWordCountTest { HadoopFileBlock fileBlock2 = new HadoopFileBlock(HOSTS, inFileUri, fileBlock1.length(), igfs.info(inFile).length() - fileBlock1.length()); - HadoopV2Job gridJob = getHadoopJob(igfsScheme() + inFile.toString(), igfsScheme() + PATH_OUTPUT); + HadoopJob gridJob = getHadoopJob(igfsScheme() + inFile.toString(), igfsScheme() + PATH_OUTPUT); HadoopTaskInfo taskInfo = new HadoopTaskInfo(HadoopTaskType.MAP, gridJob.id(), 0, 0, fileBlock1); @@ -110,7 +109,7 @@ abstract class HadoopTasksAllVersionsTest extends HadoopAbstractWordCountTest { * @return Context with mock output. * @throws IgniteCheckedException If fails. */ - private HadoopTestTaskContext runTaskWithInput(HadoopV2Job gridJob, HadoopTaskType taskType, + private HadoopTestTaskContext runTaskWithInput(HadoopJob gridJob, HadoopTaskType taskType, int taskNum, String... words) throws IgniteCheckedException { HadoopTaskInfo taskInfo = new HadoopTaskInfo(taskType, gridJob.id(), taskNum, 0, null); @@ -136,7 +135,7 @@ abstract class HadoopTasksAllVersionsTest extends HadoopAbstractWordCountTest { * @throws Exception If fails. */ public void testReduceTask() throws Exception { - HadoopV2Job gridJob = getHadoopJob(igfsScheme() + PATH_INPUT, igfsScheme() + PATH_OUTPUT); + HadoopJob gridJob = getHadoopJob(igfsScheme() + PATH_INPUT, igfsScheme() + PATH_OUTPUT); runTaskWithInput(gridJob, HadoopTaskType.REDUCE, 0, "word1", "5", "word2", "10"); runTaskWithInput(gridJob, HadoopTaskType.REDUCE, 1, "word3", "7", "word4", "15"); @@ -162,7 +161,7 @@ abstract class HadoopTasksAllVersionsTest extends HadoopAbstractWordCountTest { * @throws Exception If fails. */ public void testCombinerTask() throws Exception { - HadoopV2Job gridJob = getHadoopJob("/", "/"); + HadoopJob gridJob = getHadoopJob("/", "/"); HadoopTestTaskContext ctx = runTaskWithInput(gridJob, HadoopTaskType.COMBINE, 0, "word1", "5", "word2", "10"); @@ -182,7 +181,7 @@ abstract class HadoopTasksAllVersionsTest extends HadoopAbstractWordCountTest { * @return Context of combine task with mock output. * @throws IgniteCheckedException If fails. */ - private HadoopTestTaskContext runMapCombineTask(HadoopFileBlock fileBlock, HadoopV2Job gridJob) + private HadoopTestTaskContext runMapCombineTask(HadoopFileBlock fileBlock, HadoopJob gridJob) throws IgniteCheckedException { HadoopTaskInfo taskInfo = new HadoopTaskInfo(HadoopTaskType.MAP, gridJob.id(), 0, 0, fileBlock); @@ -228,7 +227,7 @@ abstract class HadoopTasksAllVersionsTest extends HadoopAbstractWordCountTest { HadoopFileBlock fileBlock1 = new HadoopFileBlock(HOSTS, inFileUri, 0, l); HadoopFileBlock fileBlock2 = new HadoopFileBlock(HOSTS, inFileUri, l, fileLen - l); - HadoopV2Job gridJob = getHadoopJob(inFileUri.toString(), igfsScheme() + PATH_OUTPUT); + HadoopJob gridJob = getHadoopJob(inFileUri.toString(), igfsScheme() + PATH_OUTPUT); HadoopTestTaskContext combine1Ctx = runMapCombineTask(fileBlock1, gridJob); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java index b41a260..48e83cc 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java @@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.hadoop; import org.apache.hadoop.mapred.*; import org.apache.ignite.internal.processors.hadoop.examples.*; -import org.apache.ignite.internal.processors.hadoop.v2.*; import java.io.*; import java.util.*; @@ -38,7 +37,7 @@ public class HadoopTasksV1Test extends HadoopTasksAllVersionsTest { * @return Hadoop job. * @throws IOException If fails. */ - @Override public HadoopV2Job getHadoopJob(String inFile, String outFile) throws Exception { + @Override public HadoopJob getHadoopJob(String inFile, String outFile) throws Exception { JobConf jobConf = HadoopWordCount1.getJob(inFile, outFile); setupFileSystems(jobConf); @@ -47,7 +46,7 @@ public class HadoopTasksV1Test extends HadoopTasksAllVersionsTest { HadoopJobId jobId = new HadoopJobId(new UUID(0, 0), 0); - return new HadoopV2Job(jobId, jobInfo, log); + return jobInfo.createJob(jobId, log); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java index b677c63..e73fae3 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java @@ -24,7 +24,6 @@ import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.lib.input.*; import org.apache.hadoop.mapreduce.lib.output.*; import org.apache.ignite.internal.processors.hadoop.examples.*; -import org.apache.ignite.internal.processors.hadoop.v2.*; import java.util.*; @@ -42,7 +41,7 @@ public class HadoopTasksV2Test extends HadoopTasksAllVersionsTest { * @return Hadoop job. * @throws Exception if fails. */ - @Override public HadoopV2Job getHadoopJob(String inFile, String outFile) throws Exception { + @Override public HadoopJob getHadoopJob(String inFile, String outFile) throws Exception { Job job = Job.getInstance(); job.setOutputKeyClass(Text.class); @@ -65,7 +64,7 @@ public class HadoopTasksV2Test extends HadoopTasksAllVersionsTest { HadoopJobId jobId = new HadoopJobId(new UUID(0, 0), 0); - return new HadoopV2Job(jobId, jobInfo, log); + return jobInfo.createJob(jobId, log); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java index ebc89f4..f3b9307 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java @@ -66,7 +66,11 @@ public class HadoopV2JobSelfTest extends HadoopAbstractSelfTest { cfg.setMapOutputValueClass(Text.class); cfg.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, CustomSerialization.class.getName()); - HadoopJob job = new HadoopV2Job(new HadoopJobId(UUID.randomUUID(), 1), createJobInfo(cfg), log); + HadoopDefaultJobInfo info = createJobInfo(cfg); + + HadoopJobId id = new HadoopJobId(UUID.randomUUID(), 1); + + HadoopJob job = info.createJob(id, log); HadoopTaskContext taskCtx = job.getTaskContext(new HadoopTaskInfo(HadoopTaskType.MAP, null, 0, 0, null)); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c9f72917/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java index b4ed5e1..9395c5e 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java @@ -28,6 +28,7 @@ import org.apache.ignite.testframework.junits.common.*; import org.jetbrains.annotations.*; import java.util.*; +import java.util.concurrent.*; /** * Abstract class for maps test. @@ -95,9 +96,20 @@ public abstract class HadoopAbstractMapTest extends GridCommonAbstractTest { assert false; } + /** {@inheritDoc} */ @Override public void cleanupTaskEnvironment() throws IgniteCheckedException { assert false; } + + /** {@inheritDoc} */ + @Override public <T> T runAsJobOwner(Callable<T> c) throws IgniteCheckedException { + try { + return c.call(); + } + catch (Exception e) { + throw new IgniteCheckedException(e); + } + } } /**