#[IGNITE-218]: mapred execution in correct user context.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/4d32feeb Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/4d32feeb Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/4d32feeb Branch: refs/heads/ignite-218 Commit: 4d32feeb0d206ce6edd10b7e649dca928804bd74 Parents: 1b7cead Author: iveselovskiy <iveselovs...@gridgain.com> Authored: Fri May 29 21:01:35 2015 +0300 Committer: iveselovskiy <iveselovs...@gridgain.com> Committed: Fri May 29 21:01:35 2015 +0300 ---------------------------------------------------------------------- config/hadoop/default-config.xml | 3 + .../processors/hadoop/HadoopTaskContext.java | 11 ++ .../internal/processors/igfs/IgfsUtils.java | 4 - .../fs/IgniteHadoopFileSystemCounterWriter.java | 2 +- .../hadoop/fs/v1/IgniteHadoopFileSystem.java | 16 +-- .../processors/hadoop/HadoopDefaultJobInfo.java | 2 +- .../internal/processors/hadoop/HadoopUtils.java | 53 +++++++++ .../hadoop/SecondaryFileSystemProvider.java | 2 +- .../hadoop/fs/HadoopLazyConcurrentMap.java | 5 +- .../hadoop/taskexecutor/HadoopRunnableTask.java | 111 ++++++++++--------- .../processors/hadoop/v2/HadoopV2Job.java | 2 +- .../hadoop/v2/HadoopV2JobResourceManager.java | 4 +- .../hadoop/v2/HadoopV2TaskContext.java | 51 +++++++++ .../hadoop/HadoopClientProtocolSelfTest.java | 6 +- .../HadoopDefaultMapReducePlannerSelfTest.java | 2 - .../processors/hadoop/HadoopMapReduceTest.java | 15 ++- .../hadoop/HadoopTasksAllVersionsTest.java | 15 ++- .../processors/hadoop/HadoopTasksV1Test.java | 5 +- .../processors/hadoop/HadoopTasksV2Test.java | 5 +- .../processors/hadoop/HadoopV2JobSelfTest.java | 6 +- .../hadoop/examples/HadoopWordCount2.java | 24 +--- .../collections/HadoopAbstractMapTest.java | 16 +++ 22 files changed, 238 insertions(+), 122 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d32feeb/config/hadoop/default-config.xml ---------------------------------------------------------------------- diff --git a/config/hadoop/default-config.xml b/config/hadoop/default-config.xml index 30413b0..c1e4855 100644 --- a/config/hadoop/default-config.xml +++ b/config/hadoop/default-config.xml @@ -90,6 +90,9 @@ Configuration of Ignite node. --> <bean id="grid.cfg" class="org.apache.ignite.configuration.IgniteConfiguration"> + <!-- Temporary workaround for tests: --> + <property name="localHost" value="127.0.0.1"/> + <!-- Apache Hadoop Accelerator configuration. --> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d32feeb/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java index 371fd81..47c55bd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java @@ -21,6 +21,7 @@ import org.apache.ignite.*; import org.apache.ignite.internal.processors.hadoop.counter.*; import java.util.*; +import java.util.concurrent.*; /** * Task context. @@ -187,4 +188,14 @@ public abstract class HadoopTaskContext { * @throws IgniteCheckedException If failed. */ public abstract void cleanupTaskEnvironment() throws IgniteCheckedException; + + /** + * + * @param user + * @param callable + * @param <T> + * @return + * @throws IgniteCheckedException + */ + public abstract <T> T runAs(String user, Callable<T> callable) throws IgniteCheckedException; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d32feeb/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java index 71a1dbe..8026a44 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java @@ -92,11 +92,7 @@ public class IgfsUtils { } /** -<<<<<<< HEAD - * Provides non-null interned user name. -======= * Provides non-null user name. ->>>>>>> 8455c7a6ed6f7449c7ad31b1ef7b129705262e1b * If the user name is null or empty string, defaults to {@link FileSystemConfiguration#DFLT_USER_NAME}, * which is the current process owner user. * @param user a user name to be fixed. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d32feeb/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java index 821acdb..cb4f19b 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java @@ -54,7 +54,7 @@ public class IgniteHadoopFileSystemCounterWriter implements HadoopCounterWriter @Override public void write(HadoopJobInfo jobInfo, HadoopJobId jobId, HadoopCounters cntrs) throws IgniteCheckedException { - Configuration hadoopCfg = new Configuration(); + Configuration hadoopCfg = HadoopUtils.safeCreateConfiguration(); for (Map.Entry<String, String> e : ((HadoopDefaultJobInfo)jobInfo).properties().entrySet()) hadoopCfg.set(e.getKey(), e.getValue()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d32feeb/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java index c0a9ade..bbb8c5f 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java @@ -179,14 +179,14 @@ public class IgniteHadoopFileSystem extends FileSystem { public static String getFsHadoopUser(Configuration cfg) throws IOException { String user = null; - // ------------------------------------------- - // TODO: Temporary workaround, see https://issues.apache.org/jira/browse/IGNITE-761 - // We have an issue there: sometimes FileSystem created from MR jobs gets incorrect - // UserGroupInformation.getCurrentUser() despite of the fact that it is invoked in correct - // ugi.doAs() closure. - if (cfg != null) - user = cfg.get(MRJobConfig.USER_NAME); - // ------------------------------------------- +// // ------------------------------------------- +// // TODO: Temporary workaround, see https://issues.apache.org/jira/browse/IGNITE-761 +// // We have an issue there: sometimes FileSystem created from MR jobs gets incorrect +// // UserGroupInformation.getCurrentUser() despite of the fact that it is invoked in correct +// // ugi.doAs() closure. +// if (cfg != null) +// user = cfg.get(MRJobConfig.USER_NAME); +// // ------------------------------------------- if (user == null) { UserGroupInformation currUgi = UserGroupInformation.getCurrentUser(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d32feeb/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java index d0a327e..2e855d0 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java @@ -89,7 +89,7 @@ public class HadoopDefaultJobInfo implements HadoopJobInfo, Externalizable { if (jobCls0 == null) { // It is enough to have only one class loader with only Hadoop classes. synchronized (HadoopDefaultJobInfo.class) { if ((jobCls0 = jobCls) == null) { - HadoopClassLoader ldr = new HadoopClassLoader(null, "hadoop-main"); + HadoopClassLoader ldr = new HadoopClassLoader(null, "hadoop-job"); jobCls = jobCls0 = ldr.loadClass(HadoopV2Job.class.getName()); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d32feeb/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java index d493bd4..ca3a6c5 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java @@ -304,6 +304,59 @@ public class HadoopUtils { } /** + * + * @return + */ + static Configuration checkPreconditionAndCreateConfiguration() { + ClassLoader confCl = Configuration.class.getClassLoader(); + + ClassLoader ctxCl = Thread.currentThread().getContextClassLoader(); + + if (ctxCl != null && confCl != ctxCl) + throw new IllegalStateException("Wrong classloader: " + confCl + " != " + ctxCl); + + return new Configuration(); + } + + /** + * For diagnostic & test purposes. + * @param c the Configuration to check. + */ + static void checkConfiguration(Configuration c) { + String name = Configuration.class.getName(); + + c.set("xxx", name); + + Class clazz = c.getClass("xxx", null); + + c.unset("xxx"); + + if (clazz != c.getClass()) + throw new IllegalStateException("Wrong configuration."); + } + + /** + * + * @return + */ + public static Configuration safeCreateConfiguration() { + final ClassLoader cl0 = Thread.currentThread().getContextClassLoader(); + + Thread.currentThread().setContextClassLoader(Configuration.class.getClassLoader()); + + try { + Configuration c = checkPreconditionAndCreateConfiguration(); + + checkConfiguration(c); + + return c; + } + finally { + Thread.currentThread().setContextClassLoader(cl0); + } + } + + /** * Constructor. */ private HadoopUtils() { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d32feeb/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java index b1a057c..f9a68f1 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java @@ -34,7 +34,7 @@ import java.security.*; */ public class SecondaryFileSystemProvider { /** Configuration of the secondary filesystem, never null. */ - private final Configuration cfg = new Configuration(); + private final Configuration cfg = HadoopUtils.safeCreateConfiguration(); /** The secondary filesystem URI, never null. */ private final URI uri; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d32feeb/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java index f1c10b6..71b38c4 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java @@ -18,13 +18,12 @@ package org.apache.ignite.internal.processors.hadoop.fs; import org.apache.ignite.*; +import org.apache.ignite.internal.util.future.*; import org.jsr166.*; +import java.io.*; import java.util.*; import java.util.concurrent.*; -import org.apache.ignite.internal.util.future.*; - -import java.io.*; import java.util.concurrent.locks.*; /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d32feeb/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java index 1919fc4..ac141a9 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java @@ -17,21 +17,14 @@ package org.apache.ignite.internal.processors.hadoop.taskexecutor; -import org.apache.hadoop.conf.*; -import org.apache.hadoop.fs.*; -import org.apache.hadoop.security.*; import org.apache.ignite.*; import org.apache.ignite.internal.processors.hadoop.*; import org.apache.ignite.internal.processors.hadoop.counter.*; import org.apache.ignite.internal.processors.hadoop.shuffle.collections.*; -import org.apache.ignite.internal.processors.hadoop.v2.*; import org.apache.ignite.internal.processors.igfs.*; import org.apache.ignite.internal.util.offheap.unsafe.*; -import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; -import java.io.*; -import java.security.*; import java.util.*; import java.util.concurrent.*; @@ -111,56 +104,76 @@ public abstract class HadoopRunnableTask implements Callable<Void> { user = IgfsUtils.fixUserName(user); - String ugiUser; - try { - UserGroupInformation currUser = UserGroupInformation.getCurrentUser(); - - ugiUser = currUser.getShortUserName(); - } - catch (IOException ioe) { - throw new IgniteCheckedException(ioe); - } - - if (F.eq(user, ugiUser)) - // if current UGI context user is the same, do direct call: - return callImpl(); - else { - // do the call in the context of 'user': - try { - final String ticketCachePath = getJobProperty(CommonConfigurationKeys.KERBEROS_TICKET_CACHE_PATH); - - UserGroupInformation ugi = UserGroupInformation.getBestUGI(ticketCachePath, user); - - return ugi.doAs(new PrivilegedExceptionAction<Void>() { - @Override public Void run() throws IgniteCheckedException { - return callImpl(); - } - }); - } catch (IOException | InterruptedException e) { - throw new IgniteCheckedException(e); - } - } + return callImpl(user); + +// String ugiUser; +// try { +// UserGroupInformation currUser = UserGroupInformation.getCurrentUser(); +// +// ugiUser = currUser.getShortUserName(); +// } +// catch (IOException ioe) { +// throw new IgniteCheckedException(ioe); +// } +// +// if (F.eq(user, ugiUser)) +// // if current UGI context user is the same, do direct call: +// return callImpl(); +// else { +// // do the call in the context of 'user': +// try { +// final String ticketCachePath = getJobProperty(CommonConfigurationKeys.KERBEROS_TICKET_CACHE_PATH); +// +// UserGroupInformation ugi = UserGroupInformation.getBestUGI(ticketCachePath, user); +// +// return ugi.doAs(new PrivilegedExceptionAction<Void>() { +// @Override public Void run() throws IgniteCheckedException { +// return callImpl(); +// } +// }); +// } catch (IOException | InterruptedException e) { +// throw new IgniteCheckedException(e); +// } +// } } +// /** +// * Gets the job property. +// */ +// private String getJobProperty(String key) { +// if (job instanceof HadoopV2Job) { +// Configuration conf = ((HadoopV2Job)job).jobConf(); +// +// return conf.get(key); +// } +// else +// return job.info().property(key); +// } + /** - * Gets the job property. + * Runnable task call implementation + * @return null. + * @throws IgniteCheckedException */ - private String getJobProperty(String key) { - if (job instanceof HadoopV2Job) { - Configuration conf = ((HadoopV2Job)job).jobConf(); + Void callImpl(final String user) throws IgniteCheckedException { + assert user != null; - return conf.get(key); - } - else - return job.info().property(key); + ctx = job.getTaskContext(info); + + return ctx.runAs(user, new Callable<Void>() { + @Override public Void call() throws Exception { + runTaskImpl(); + + return null; + } + }); } /** - * Runnable task call implementation - * @return null. + * Implements actual task running. * @throws IgniteCheckedException */ - Void callImpl() throws IgniteCheckedException { + void runTaskImpl() throws IgniteCheckedException { execStartTs = U.currentTimeMillis(); Throwable err = null; @@ -170,8 +183,6 @@ public abstract class HadoopRunnableTask implements Callable<Void> { HadoopPerformanceCounter perfCntr = null; try { - ctx = job.getTaskContext(info); - perfCntr = HadoopPerformanceCounter.getCounter(ctx.counters(), nodeId); perfCntr.onTaskSubmit(info, submitTs); @@ -218,8 +229,6 @@ public abstract class HadoopRunnableTask implements Callable<Void> { if (ctx != null) ctx.cleanupTaskEnvironment(); } - - return null; } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d32feeb/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java index a4a7b25..3d47960 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java @@ -205,7 +205,7 @@ public class HadoopV2Job implements HadoopJob { // Note that the classloader identified by the task it was initially created for, // but later it may be reused for other tasks. HadoopClassLoader ldr = new HadoopClassLoader(rsrcMgr.classPath(), - "hadoop-" + info.jobId() + "-" + info.type() + "-" + info.taskNumber()); + "hadoop-task-" + info.jobId() + "-" + info.type() + "-" + info.taskNumber()); cls = (Class<? extends HadoopTaskContext>)ldr.loadClass(HadoopV2TaskContext.class.getName()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d32feeb/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java index 7dcd10b..7bc0fb0 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java @@ -163,7 +163,6 @@ public class HadoopV2JobResourceManager { stagingDir = new Path(new URI(mrDir)); if (download) { - // TODO: Out of bounds. try (FileSystem fs = fileSystemForMrUser(stagingDir.toUri(), cfg)) { if (!fs.exists(stagingDir)) throw new IgniteCheckedException("Failed to find map-reduce " + @@ -199,7 +198,8 @@ public class HadoopV2JobResourceManager { } } else if (!jobLocDir.mkdirs()) - throw new IgniteCheckedException("Failed to create local job directory: " + jobLocDir.getAbsolutePath()); + throw new IgniteCheckedException("Failed to create local job directory: " + + jobLocDir.getAbsolutePath()); setLocalFSWorkingDirectory(jobLocDir); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d32feeb/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java index 9086874..0bbe1d7 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java @@ -28,16 +28,20 @@ import org.apache.hadoop.mapred.TaskAttemptID; import org.apache.hadoop.mapred.TaskID; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.*; +import org.apache.hadoop.security.*; import org.apache.ignite.*; import org.apache.ignite.internal.processors.hadoop.*; import org.apache.ignite.internal.processors.hadoop.counter.*; import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters; import org.apache.ignite.internal.processors.hadoop.v1.*; +import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.jetbrains.annotations.*; import java.io.*; +import java.security.*; import java.util.*; +import java.util.concurrent.*; import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.*; import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*; @@ -447,4 +451,51 @@ public class HadoopV2TaskContext extends HadoopTaskContext { throw new IgniteCheckedException(e); } } + + @Override public <T> T runAs(final String user, final Callable<T> callable) throws IgniteCheckedException { + String ugiUser; + try { + UserGroupInformation currUser = UserGroupInformation.getCurrentUser(); + + ugiUser = currUser.getShortUserName(); + } + catch (IOException ioe) { + throw new IgniteCheckedException(ioe); + } + + try { + if (F.eq(user, ugiUser)) + // if current UGI context user is the same, do direct call: + return callable.call(); + else { + // do the call in the context of 'user': +// final String ticketCachePath = getJobProperty(CommonConfigurationKeys.KERBEROS_TICKET_CACHE_PATH); +// + UserGroupInformation ugi = UserGroupInformation.getBestUGI(null, user); + + return ugi.doAs(new PrivilegedExceptionAction<T>() { + @Override public T run() throws Exception { + return callable.call(); + } + }); + } + } + catch (Exception e) { + throw new IgniteCheckedException(e); + } + } + +// /** +// * Gets the job property. +// */ +// private String getJobProperty(String key) { +// if (job instanceof HadoopV2Job) { +// Configuration conf = ((HadoopV2Job)job).jobConf(); +// +// return conf.get(key); +// } +// else +// return job.info().property(key); +// } + } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d32feeb/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolSelfTest.java index b94d9d1..b9f8179 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolSelfTest.java @@ -28,7 +28,6 @@ import org.apache.ignite.*; import org.apache.ignite.hadoop.mapreduce.*; import org.apache.ignite.igfs.*; import org.apache.ignite.internal.processors.hadoop.*; -import org.apache.ignite.internal.processors.hadoop.proto.*; import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; @@ -449,7 +448,7 @@ public class HadoopClientProtocolSelfTest extends HadoopAbstractSelfTest { * @return Configuration. */ private Configuration config(int port) { - Configuration conf = new Configuration(); + Configuration conf = HadoopUtils.safeCreateConfiguration(); setupFileSystems(conf); @@ -521,9 +520,8 @@ public class HadoopClientProtocolSelfTest extends HadoopAbstractSelfTest { ctx.getCounter(TestCounter.COUNTER2).increment(1); int sum = 0; - for (IntWritable value : values) { + for (IntWritable value : values) sum += value.get(); - } ctx.write(key, new IntWritable(sum)); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d32feeb/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java index 30babe3..f3c74fc 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java @@ -1000,7 +1000,5 @@ public class HadoopDefaultMapReducePlannerSelfTest extends HadoopAbstractSelfTes @Override public GridKernalContext context() { return null; } - - } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d32feeb/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java index 8a3a0ac..f96eb74 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java @@ -30,6 +30,7 @@ import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.hadoop.counter.*; import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters; import org.apache.ignite.internal.processors.hadoop.examples.*; +import org.apache.ignite.internal.processors.igfs.*; import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.testframework.*; @@ -105,14 +106,16 @@ public class HadoopMapReduceTest extends HadoopAbstractWordCountTest { checkJobStatistics(jobId); + final String outFile = PATH_OUTPUT + "/" + (useNewReducer ? "part-r-" : "part-") + "00000"; + assertEquals("Use new mapper: " + useNewMapper + ", new combiner: " + useNewCombiner + ", new reducer: " + - useNewReducer, + useNewReducer, "blue\t200000\n" + - "green\t150000\n" + - "red\t100000\n" + - "yellow\t70000\n", - readAndSortFile(PATH_OUTPUT + "/" + (useNewReducer ? "part-r-" : "part-") + "00000") - ); + "green\t150000\n" + + "red\t100000\n" + + "yellow\t70000\n", + readAndSortFile(outFile) + ); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d32feeb/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksAllVersionsTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksAllVersionsTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksAllVersionsTest.java index aaf0f92..6930020 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksAllVersionsTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksAllVersionsTest.java @@ -22,7 +22,6 @@ import org.apache.hadoop.io.*; import org.apache.ignite.*; import org.apache.ignite.igfs.*; import org.apache.ignite.internal.processors.hadoop.examples.*; -import org.apache.ignite.internal.processors.hadoop.v2.*; import java.io.*; import java.net.*; @@ -43,7 +42,7 @@ abstract class HadoopTasksAllVersionsTest extends HadoopAbstractWordCountTest { * @return Hadoop job. * @throws IOException If fails. */ - public abstract HadoopV2Job getHadoopJob(String inFile, String outFile) throws Exception; + public abstract HadoopJob getHadoopJob(String inFile, String outFile) throws Exception; /** * @return prefix of reducer output file name. It's "part-" for v1 and "part-r-" for v2 API @@ -79,7 +78,7 @@ abstract class HadoopTasksAllVersionsTest extends HadoopAbstractWordCountTest { HadoopFileBlock fileBlock2 = new HadoopFileBlock(HOSTS, inFileUri, fileBlock1.length(), igfs.info(inFile).length() - fileBlock1.length()); - HadoopV2Job gridJob = getHadoopJob(igfsScheme() + inFile.toString(), igfsScheme() + PATH_OUTPUT); + HadoopJob gridJob = getHadoopJob(igfsScheme() + inFile.toString(), igfsScheme() + PATH_OUTPUT); HadoopTaskInfo taskInfo = new HadoopTaskInfo(HadoopTaskType.MAP, gridJob.id(), 0, 0, fileBlock1); @@ -110,7 +109,7 @@ abstract class HadoopTasksAllVersionsTest extends HadoopAbstractWordCountTest { * @return Context with mock output. * @throws IgniteCheckedException If fails. */ - private HadoopTestTaskContext runTaskWithInput(HadoopV2Job gridJob, HadoopTaskType taskType, + private HadoopTestTaskContext runTaskWithInput(HadoopJob gridJob, HadoopTaskType taskType, int taskNum, String... words) throws IgniteCheckedException { HadoopTaskInfo taskInfo = new HadoopTaskInfo(taskType, gridJob.id(), taskNum, 0, null); @@ -136,7 +135,7 @@ abstract class HadoopTasksAllVersionsTest extends HadoopAbstractWordCountTest { * @throws Exception If fails. */ public void testReduceTask() throws Exception { - HadoopV2Job gridJob = getHadoopJob(igfsScheme() + PATH_INPUT, igfsScheme() + PATH_OUTPUT); + HadoopJob gridJob = getHadoopJob(igfsScheme() + PATH_INPUT, igfsScheme() + PATH_OUTPUT); runTaskWithInput(gridJob, HadoopTaskType.REDUCE, 0, "word1", "5", "word2", "10"); runTaskWithInput(gridJob, HadoopTaskType.REDUCE, 1, "word3", "7", "word4", "15"); @@ -162,7 +161,7 @@ abstract class HadoopTasksAllVersionsTest extends HadoopAbstractWordCountTest { * @throws Exception If fails. */ public void testCombinerTask() throws Exception { - HadoopV2Job gridJob = getHadoopJob("/", "/"); + HadoopJob gridJob = getHadoopJob("/", "/"); HadoopTestTaskContext ctx = runTaskWithInput(gridJob, HadoopTaskType.COMBINE, 0, "word1", "5", "word2", "10"); @@ -182,7 +181,7 @@ abstract class HadoopTasksAllVersionsTest extends HadoopAbstractWordCountTest { * @return Context of combine task with mock output. * @throws IgniteCheckedException If fails. */ - private HadoopTestTaskContext runMapCombineTask(HadoopFileBlock fileBlock, HadoopV2Job gridJob) + private HadoopTestTaskContext runMapCombineTask(HadoopFileBlock fileBlock, HadoopJob gridJob) throws IgniteCheckedException { HadoopTaskInfo taskInfo = new HadoopTaskInfo(HadoopTaskType.MAP, gridJob.id(), 0, 0, fileBlock); @@ -228,7 +227,7 @@ abstract class HadoopTasksAllVersionsTest extends HadoopAbstractWordCountTest { HadoopFileBlock fileBlock1 = new HadoopFileBlock(HOSTS, inFileUri, 0, l); HadoopFileBlock fileBlock2 = new HadoopFileBlock(HOSTS, inFileUri, l, fileLen - l); - HadoopV2Job gridJob = getHadoopJob(inFileUri.toString(), igfsScheme() + PATH_OUTPUT); + HadoopJob gridJob = getHadoopJob(inFileUri.toString(), igfsScheme() + PATH_OUTPUT); HadoopTestTaskContext combine1Ctx = runMapCombineTask(fileBlock1, gridJob); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d32feeb/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java index b41a260..48e83cc 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java @@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.hadoop; import org.apache.hadoop.mapred.*; import org.apache.ignite.internal.processors.hadoop.examples.*; -import org.apache.ignite.internal.processors.hadoop.v2.*; import java.io.*; import java.util.*; @@ -38,7 +37,7 @@ public class HadoopTasksV1Test extends HadoopTasksAllVersionsTest { * @return Hadoop job. * @throws IOException If fails. */ - @Override public HadoopV2Job getHadoopJob(String inFile, String outFile) throws Exception { + @Override public HadoopJob getHadoopJob(String inFile, String outFile) throws Exception { JobConf jobConf = HadoopWordCount1.getJob(inFile, outFile); setupFileSystems(jobConf); @@ -47,7 +46,7 @@ public class HadoopTasksV1Test extends HadoopTasksAllVersionsTest { HadoopJobId jobId = new HadoopJobId(new UUID(0, 0), 0); - return new HadoopV2Job(jobId, jobInfo, log); + return jobInfo.createJob(jobId, log); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d32feeb/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java index b677c63..e73fae3 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java @@ -24,7 +24,6 @@ import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.lib.input.*; import org.apache.hadoop.mapreduce.lib.output.*; import org.apache.ignite.internal.processors.hadoop.examples.*; -import org.apache.ignite.internal.processors.hadoop.v2.*; import java.util.*; @@ -42,7 +41,7 @@ public class HadoopTasksV2Test extends HadoopTasksAllVersionsTest { * @return Hadoop job. * @throws Exception if fails. */ - @Override public HadoopV2Job getHadoopJob(String inFile, String outFile) throws Exception { + @Override public HadoopJob getHadoopJob(String inFile, String outFile) throws Exception { Job job = Job.getInstance(); job.setOutputKeyClass(Text.class); @@ -65,7 +64,7 @@ public class HadoopTasksV2Test extends HadoopTasksAllVersionsTest { HadoopJobId jobId = new HadoopJobId(new UUID(0, 0), 0); - return new HadoopV2Job(jobId, jobInfo, log); + return jobInfo.createJob(jobId, log); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d32feeb/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java index ebc89f4..f3b9307 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java @@ -66,7 +66,11 @@ public class HadoopV2JobSelfTest extends HadoopAbstractSelfTest { cfg.setMapOutputValueClass(Text.class); cfg.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, CustomSerialization.class.getName()); - HadoopJob job = new HadoopV2Job(new HadoopJobId(UUID.randomUUID(), 1), createJobInfo(cfg), log); + HadoopDefaultJobInfo info = createJobInfo(cfg); + + HadoopJobId id = new HadoopJobId(UUID.randomUUID(), 1); + + HadoopJob job = info.createJob(id, log); HadoopTaskContext taskCtx = job.getTaskContext(new HadoopTaskInfo(HadoopTaskType.MAP, null, 0, 0, null)); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d32feeb/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2.java index 8637a4e..dc68df7 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2.java @@ -17,13 +17,11 @@ package org.apache.ignite.internal.processors.hadoop.examples; -import org.apache.hadoop.conf.*; import org.apache.hadoop.fs.*; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.lib.input.*; import org.apache.hadoop.mapreduce.lib.output.*; -import org.apache.ignite.internal.processors.hadoop.*; import java.io.*; @@ -46,24 +44,6 @@ public class HadoopWordCount2 { Job job = getJob(args[0], args[1]); job.submit(); - - printCounters(job); - - job.waitForCompletion(true); - - printCounters(job); - } - - private static void printCounters(Job job) throws IOException { - Counters counters = job.getCounters(); - - for (CounterGroup group : counters) { - System.out.println("Group: " + group.getDisplayName() + "," + group.getName()); - System.out.println(" number of counters: " + group.size()); - for (Counter counter : group) { - System.out.println(" - " + counter.getDisplayName() + ": " + counter.getName() + ": "+counter.getValue()); - } - } } /** @@ -75,9 +55,7 @@ public class HadoopWordCount2 { * @throws IOException If fails. */ public static Job getJob(String input, String output) throws IOException { - Configuration cfg = HadoopStartup.configuration(); - - Job job = Job.getInstance(cfg); + Job job = Job.getInstance(); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d32feeb/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java index b4ed5e1..c894b76 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java @@ -28,6 +28,7 @@ import org.apache.ignite.testframework.junits.common.*; import org.jetbrains.annotations.*; import java.util.*; +import java.util.concurrent.*; /** * Abstract class for maps test. @@ -98,6 +99,21 @@ public abstract class HadoopAbstractMapTest extends GridCommonAbstractTest { @Override public void cleanupTaskEnvironment() throws IgniteCheckedException { assert false; } + + /** + * @param user + * @param callable + * @return + * @throws IgniteCheckedException + */ + @Override public <T> T runAs(String user, Callable<T> callable) throws IgniteCheckedException { + try { + return callable.call(); + } + catch (Exception e) { + throw new IgniteCheckedException(e); + } + } } /**