# IGNITE-386: WIP on internal namings.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/ace354c6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/ace354c6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/ace354c6 Branch: refs/heads/ignite-386 Commit: ace354c6cfe9293a786ad3c7aaea94ab6abcfd0b Parents: 7d46deb Author: vozerov-gridgain <voze...@gridgain.com> Authored: Tue Mar 3 15:43:58 2015 +0300 Committer: vozerov-gridgain <voze...@gridgain.com> Committed: Tue Mar 3 15:43:58 2015 +0300 ---------------------------------------------------------------------- bin/setup-hadoop.bat | 2 +- bin/setup-hadoop.sh | 2 +- .../hadoop/GridHadoopClassLoader.java | 552 ------ .../processors/hadoop/GridHadoopComponent.java | 61 - .../processors/hadoop/GridHadoopContext.java | 196 --- .../hadoop/GridHadoopDefaultJobInfo.java | 163 -- .../processors/hadoop/GridHadoopImpl.java | 132 -- .../processors/hadoop/GridHadoopSetup.java | 505 ------ .../GridHadoopTaskCancelledException.java | 35 - .../processors/hadoop/GridHadoopUtils.java | 308 ---- .../processors/hadoop/HadoopClassLoader.java | 552 ++++++ .../processors/hadoop/HadoopComponent.java | 61 + .../processors/hadoop/HadoopContext.java | 196 +++ .../processors/hadoop/HadoopCounterGroup.java | 2 +- .../processors/hadoop/HadoopDefaultJobInfo.java | 163 ++ .../internal/processors/hadoop/HadoopImpl.java | 132 ++ .../internal/processors/hadoop/HadoopSetup.java | 505 ++++++ .../hadoop/HadoopTaskCancelledException.java | 35 + .../internal/processors/hadoop/HadoopUtils.java | 308 ++++ .../hadoop/IgniteHadoopProcessor.java | 32 +- .../counter/GridHadoopFSCounterWriter.java | 2 +- .../counter/GridHadoopPerformanceCounter.java | 2 +- .../hadoop/jobtracker/GridHadoopJobTracker.java | 1625 ------------------ .../hadoop/jobtracker/HadoopJobTracker.java | 1625 ++++++++++++++++++ .../proto/GridHadoopProtocolSubmitJobTask.java | 2 +- .../hadoop/proto/HadoopClientProtocol.java | 6 +- .../hadoop/shuffle/GridHadoopShuffle.java | 256 --- .../hadoop/shuffle/HadoopShuffle.java | 256 +++ .../GridHadoopEmbeddedTaskExecutor.java | 146 -- .../taskexecutor/GridHadoopRunnableTask.java | 4 +- .../GridHadoopTaskExecutorAdapter.java | 57 - .../HadoopEmbeddedTaskExecutor.java | 146 ++ .../taskexecutor/HadoopTaskExecutorAdapter.java | 57 + .../GridHadoopExternalTaskExecutor.java | 960 ----------- .../external/HadoopExternalTaskExecutor.java | 960 +++++++++++ .../hadoop/v1/GridHadoopV1MapTask.java | 2 +- .../hadoop/v1/GridHadoopV1ReduceTask.java | 2 +- .../hadoop/v1/GridHadoopV1Splitter.java | 2 +- .../processors/hadoop/v1/GridHadoopV1Task.java | 2 +- .../hadoop/v2/GridHadoopV2Context.java | 8 +- .../processors/hadoop/v2/GridHadoopV2Job.java | 8 +- .../hadoop/v2/GridHadoopV2Splitter.java | 2 +- .../hadoop/v2/GridHadoopV2TaskContext.java | 4 +- .../apache/ignite/igfs/IgfsEventsTestSuite.java | 4 +- .../hadoop/GridHadoopClassLoaderTest.java | 69 - .../hadoop/GridHadoopCommandLineTest.java | 4 +- ...idHadoopDefaultMapReducePlannerSelfTest.java | 2 +- .../hadoop/GridHadoopGroupingTest.java | 2 +- .../hadoop/GridHadoopJobTrackerSelfTest.java | 2 +- .../GridHadoopMapReduceEmbeddedSelfTest.java | 2 +- .../hadoop/GridHadoopMapReduceTest.java | 2 +- .../hadoop/GridHadoopSortingTest.java | 2 +- .../hadoop/GridHadoopSplitWrapperSelfTest.java | 4 +- .../hadoop/GridHadoopTaskExecutionSelfTest.java | 4 +- .../hadoop/GridHadoopTasksV1Test.java | 4 +- .../hadoop/GridHadoopTasksV2Test.java | 4 +- .../hadoop/GridHadoopTestTaskContext.java | 2 +- .../hadoop/GridHadoopV2JobSelfTest.java | 2 +- .../hadoop/HadoopClassLoaderTest.java | 69 + ...GridHadoopExternalTaskExecutionSelfTest.java | 2 +- .../testsuites/IgniteHadoopTestSuite.java | 2 +- .../IgniteIgfsLinuxAndMacOSTestSuite.java | 2 +- 62 files changed, 5130 insertions(+), 5130 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ace354c6/bin/setup-hadoop.bat ---------------------------------------------------------------------- diff --git a/bin/setup-hadoop.bat b/bin/setup-hadoop.bat index c4c73b3..a11ef8c 100644 --- a/bin/setup-hadoop.bat +++ b/bin/setup-hadoop.bat @@ -23,6 +23,6 @@ if "%OS%" == "Windows_NT" setlocal -set MAIN_CLASS=org.apache.ignite.hadoop.GridHadoopSetup +set MAIN_CLASS=org.apache.ignite.internal.processors.hadoop.HadoopSetup call "%~dp0\ignite.bat" %* http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ace354c6/bin/setup-hadoop.sh ---------------------------------------------------------------------- diff --git a/bin/setup-hadoop.sh b/bin/setup-hadoop.sh index 8969dfa..d66353f 100755 --- a/bin/setup-hadoop.sh +++ b/bin/setup-hadoop.sh @@ -54,7 +54,7 @@ setIgniteHome # # Set utility environment. # -export MAIN_CLASS=org.apache.ignite.internal.processors.hadoop.GridHadoopSetup +export MAIN_CLASS=org.apache.ignite.internal.processors.hadoop.HadoopSetup # # Start utility. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ace354c6/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopClassLoader.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopClassLoader.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopClassLoader.java deleted file mode 100644 index bc4c0bb..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopClassLoader.java +++ /dev/null @@ -1,552 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import org.apache.ignite.*; -import org.apache.ignite.internal.processors.hadoop.v2.*; -import org.apache.ignite.internal.util.typedef.*; -import org.jdk8.backport.*; -import org.jetbrains.annotations.*; -import org.objectweb.asm.*; -import org.objectweb.asm.commons.*; - -import java.io.*; -import java.net.*; -import java.util.*; -import java.util.concurrent.atomic.*; - -/** - * Class loader allowing explicitly load classes without delegation to parent class loader. - * Also supports class parsing for finding dependencies which contain transitive dependencies - * unavailable for parent. - */ -public class GridHadoopClassLoader extends URLClassLoader { - /** - * We are very parallel capable. - */ - static { - registerAsParallelCapable(); - } - - /** */ - private static final URLClassLoader APP_CLS_LDR = (URLClassLoader)GridHadoopClassLoader.class.getClassLoader(); - - /** */ - private static final Collection<URL> appJars = F.asList(APP_CLS_LDR.getURLs()); - - /** */ - private static volatile Collection<URL> hadoopJars; - - /** */ - private static final Map<String, Boolean> cache = new ConcurrentHashMap8<>(); - - /** */ - private static final Map<String, byte[]> bytesCache = new ConcurrentHashMap8<>(); - - /** - * @param urls Urls. - */ - public GridHadoopClassLoader(URL[] urls) { - super(addHadoopUrls(urls), APP_CLS_LDR); - - assert !(getParent() instanceof GridHadoopClassLoader); - } - - /** - * Need to parse only Ignite Hadoop and IGFS classes. - * - * @param cls Class name. - * @return {@code true} if we need to check this class. - */ - private static boolean isIgfsHadoop(String cls) { - String ignitePackagePrefix = "org.apache.ignite"; - int len = ignitePackagePrefix.length(); - - return cls.startsWith(ignitePackagePrefix) && (cls.indexOf("igfs.", len) != -1 || cls.indexOf(".fs.", len) != -1 || cls.indexOf("hadoop.", len) != -1); - } - - /** - * @param cls Class name. - * @return {@code true} If this is Hadoop class. - */ - private static boolean isHadoop(String cls) { - return cls.startsWith("org.apache.hadoop."); - } - - /** {@inheritDoc} */ - @Override protected Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException { - try { - if (isHadoop(name)) { // Always load Hadoop classes explicitly, since Hadoop can be available in App classpath. - if (name.endsWith(".util.ShutdownHookManager")) // Dirty hack to get rid of Hadoop shutdown hooks. - return loadFromBytes(name, GridHadoopShutdownHookManager.class.getName()); - else if (name.endsWith(".util.NativeCodeLoader")) - return loadFromBytes(name, GridHadoopNativeCodeLoader.class.getName()); - - return loadClassExplicitly(name, resolve); - } - - if (isIgfsHadoop(name)) { // For Ignite Hadoop and IGFS classes we have to check if they depend on Hadoop. - Boolean hasDeps = cache.get(name); - - if (hasDeps == null) { - hasDeps = hasExternalDependencies(name, new HashSet<String>()); - - cache.put(name, hasDeps); - } - - if (hasDeps) - return loadClassExplicitly(name, resolve); - } - - return super.loadClass(name, resolve); - } - catch (NoClassDefFoundError | ClassNotFoundException e) { - throw new ClassNotFoundException("Failed to load class: " + name, e); - } - } - - /** - * @param name Name. - * @param replace Replacement. - * @return Class. - */ - private Class<?> loadFromBytes(final String name, final String replace) { - synchronized (getClassLoadingLock(name)) { - // First, check if the class has already been loaded - Class c = findLoadedClass(name); - - if (c != null) - return c; - - byte[] bytes = bytesCache.get(name); - - if (bytes == null) { - InputStream in = loadClassBytes(getParent(), replace); - - ClassReader rdr; - - try { - rdr = new ClassReader(in); - } - catch (IOException e) { - throw new RuntimeException(e); - } - - ClassWriter w = new ClassWriter(Opcodes.ASM4); - - rdr.accept(new RemappingClassAdapter(w, new Remapper() { - /** */ - String replaceType = replace.replace('.', '/'); - - /** */ - String nameType = name.replace('.', '/'); - - @Override public String map(String type) { - if (type.equals(replaceType)) - return nameType; - - return type; - } - }), ClassReader.EXPAND_FRAMES); - - bytes = w.toByteArray(); - - bytesCache.put(name, bytes); - } - - return defineClass(name, bytes, 0, bytes.length); - } - } - - /** - * @param name Class name. - * @param resolve Resolve class. - * @return Class. - * @throws ClassNotFoundException If failed. - */ - private Class<?> loadClassExplicitly(String name, boolean resolve) throws ClassNotFoundException { - synchronized (getClassLoadingLock(name)) { - // First, check if the class has already been loaded - Class c = findLoadedClass(name); - - if (c == null) { - long t1 = System.nanoTime(); - - c = findClass(name); - - // this is the defining class loader; record the stats - sun.misc.PerfCounter.getFindClassTime().addElapsedTimeFrom(t1); - sun.misc.PerfCounter.getFindClasses().increment(); - } - - if (resolve) - resolveClass(c); - - return c; - } - } - - /** - * @param ldr Loader. - * @param clsName Class. - * @return Input stream. - */ - @Nullable private InputStream loadClassBytes(ClassLoader ldr, String clsName) { - return ldr.getResourceAsStream(clsName.replace('.', '/') + ".class"); - } - - /** - * @param clsName Class name. - * @return {@code true} If the class has external dependencies. - */ - boolean hasExternalDependencies(final String clsName, final Set<String> visited) { - if (isHadoop(clsName)) // Hadoop must not be in classpath but Idea sucks, so filtering explicitly as external. - return true; - - // Try to get from parent to check if the type accessible. - InputStream in = loadClassBytes(getParent(), clsName); - - if (in == null) // The class is external itself, it must be loaded from this class loader. - return true; - - if (!isIgfsHadoop(clsName)) // Other classes should not have external dependencies. - return false; - - final ClassReader rdr; - - try { - rdr = new ClassReader(in); - } - catch (IOException e) { - throw new RuntimeException("Failed to read class: " + clsName, e); - } - - visited.add(clsName); - - final AtomicBoolean hasDeps = new AtomicBoolean(); - - rdr.accept(new ClassVisitor(Opcodes.ASM4) { - AnnotationVisitor av = new AnnotationVisitor(Opcodes.ASM4) { - // TODO - }; - - FieldVisitor fv = new FieldVisitor(Opcodes.ASM4) { - @Override public AnnotationVisitor visitAnnotation(String desc, boolean b) { - onType(desc); - - return av; - } - }; - - MethodVisitor mv = new MethodVisitor(Opcodes.ASM4) { - @Override public AnnotationVisitor visitAnnotation(String desc, boolean b) { - onType(desc); - - return av; - } - - @Override public AnnotationVisitor visitParameterAnnotation(int i, String desc, boolean b) { - onType(desc); - - return av; - } - - @Override public AnnotationVisitor visitAnnotationDefault() { - return av; - } - - @Override public void visitFieldInsn(int i, String owner, String name, String desc) { - onType(owner); - onType(desc); - } - - @Override public void visitFrame(int i, int i2, Object[] locTypes, int i3, Object[] stackTypes) { - for (Object o : locTypes) { - if (o instanceof String) - onType((String)o); - } - - for (Object o : stackTypes) { - if (o instanceof String) - onType((String)o); - } - } - - @Override public void visitLocalVariable(String name, String desc, String signature, Label lb, - Label lb2, int i) { - onType(desc); - } - - @Override public void visitMethodInsn(int i, String owner, String name, String desc) { - onType(owner); - } - - @Override public void visitMultiANewArrayInsn(String desc, int dim) { - onType(desc); - } - - @Override public void visitTryCatchBlock(Label lb, Label lb2, Label lb3, String e) { - onType(e); - } - }; - - void onClass(String depCls) { - assert validateClassName(depCls) : depCls; - - if (depCls.startsWith("java.")) // Filter out platform classes. - return; - - if (visited.contains(depCls)) - return; - - Boolean res = cache.get(depCls); - - if (res == Boolean.TRUE || (res == null && hasExternalDependencies(depCls, visited))) - hasDeps.set(true); - } - - void onType(String type) { - if (type == null) - return; - - int off = 0; - - while (type.charAt(off) == '[') - off++; // Handle arrays. - - if (off != 0) - type = type.substring(off); - - if (type.length() == 1) - return; // Get rid of primitives. - - if (type.charAt(type.length() - 1) == ';') { - assert type.charAt(0) == 'L' : type; - - type = type.substring(1, type.length() - 1); - } - - type = type.replace('/', '.'); - - onClass(type); - } - - @Override public void visit(int i, int i2, String name, String signature, String superName, - String[] ifaces) { - onType(superName); - - if (ifaces != null) { - for (String iface : ifaces) - onType(iface); - } - } - - @Override public AnnotationVisitor visitAnnotation(String desc, boolean visible) { - onType(desc); - - return av; - } - - @Override public void visitInnerClass(String name, String outerName, String innerName, int i) { - onType(name); - } - - @Override public FieldVisitor visitField(int i, String name, String desc, String signature, Object val) { - onType(desc); - - return fv; - } - - @Override public MethodVisitor visitMethod(int i, String name, String desc, String signature, - String[] exceptions) { - if (exceptions != null) { - for (String e : exceptions) - onType(e); - } - - return mv; - } - }, 0); - - if (hasDeps.get()) // We already know that we have dependencies, no need to check parent. - return true; - - // Here we are known to not have any dependencies but possibly we have a parent which have them. - int idx = clsName.lastIndexOf('$'); - - if (idx == -1) // No parent class. - return false; - - String parentCls = clsName.substring(0, idx); - - if (visited.contains(parentCls)) - return false; - - Boolean res = cache.get(parentCls); - - if (res == null) - res = hasExternalDependencies(parentCls, visited); - - return res; - } - - /** - * @param name Class name. - * @return {@code true} If this is a valid class name. - */ - private static boolean validateClassName(String name) { - int len = name.length(); - - if (len <= 1) - return false; - - if (!Character.isJavaIdentifierStart(name.charAt(0))) - return false; - - boolean hasDot = false; - - for (int i = 1; i < len; i++) { - char c = name.charAt(i); - - if (c == '.') - hasDot = true; - else if (!Character.isJavaIdentifierPart(c)) - return false; - } - - return hasDot; - } - - /** - * @param name Variable name. - * @param dflt Default. - * @return Value. - */ - private static String getEnv(String name, String dflt) { - String res = System.getProperty(name); - - if (F.isEmpty(res)) - res = System.getenv(name); - - return F.isEmpty(res) ? dflt : res; - } - - /** - * @param res Result. - * @param dir Directory. - * @param startsWith Starts with prefix. - * @throws MalformedURLException If failed. - */ - private static void addUrls(Collection<URL> res, File dir, final String startsWith) throws Exception { - File[] files = dir.listFiles(new FilenameFilter() { - @Override public boolean accept(File dir, String name) { - return startsWith == null || name.startsWith(startsWith); - } - }); - - if (files == null) - throw new IOException("Path is not a directory: " + dir); - - for (File file : files) - res.add(file.toURI().toURL()); - } - - /** - * @param urls URLs. - * @return URLs. - */ - private static URL[] addHadoopUrls(URL[] urls) { - Collection<URL> hadoopJars; - - try { - hadoopJars = hadoopUrls(); - } - catch (IgniteCheckedException e) { - throw new RuntimeException(e); - } - - ArrayList<URL> list = new ArrayList<>(hadoopJars.size() + appJars.size() + (urls == null ? 0 : urls.length)); - - list.addAll(appJars); - list.addAll(hadoopJars); - - if (!F.isEmpty(urls)) - list.addAll(F.asList(urls)); - - return list.toArray(new URL[list.size()]); - } - - /** - * @return HADOOP_HOME Variable. - */ - @Nullable public static String hadoopHome() { - return getEnv("HADOOP_PREFIX", getEnv("HADOOP_HOME", null)); - } - - /** - * @return Collection of jar URLs. - * @throws IgniteCheckedException If failed. - */ - public static Collection<URL> hadoopUrls() throws IgniteCheckedException { - Collection<URL> hadoopUrls = hadoopJars; - - if (hadoopUrls != null) - return hadoopUrls; - - synchronized (GridHadoopClassLoader.class) { - hadoopUrls = hadoopJars; - - if (hadoopUrls != null) - return hadoopUrls; - - hadoopUrls = new ArrayList<>(); - - String hadoopPrefix = hadoopHome(); - - if (F.isEmpty(hadoopPrefix)) - throw new IgniteCheckedException("Failed resolve Hadoop installation location. Either HADOOP_PREFIX or " + - "HADOOP_HOME environment variables must be set."); - - String commonHome = getEnv("HADOOP_COMMON_HOME", hadoopPrefix + "/share/hadoop/common"); - String hdfsHome = getEnv("HADOOP_HDFS_HOME", hadoopPrefix + "/share/hadoop/hdfs"); - String mapredHome = getEnv("HADOOP_MAPRED_HOME", hadoopPrefix + "/share/hadoop/mapreduce"); - - try { - addUrls(hadoopUrls, new File(commonHome + "/lib"), null); - addUrls(hadoopUrls, new File(hdfsHome + "/lib"), null); - addUrls(hadoopUrls, new File(mapredHome + "/lib"), null); - - addUrls(hadoopUrls, new File(hdfsHome), "hadoop-hdfs-"); - - addUrls(hadoopUrls, new File(commonHome), "hadoop-common-"); - addUrls(hadoopUrls, new File(commonHome), "hadoop-auth-"); - addUrls(hadoopUrls, new File(commonHome + "/lib"), "hadoop-auth-"); - - addUrls(hadoopUrls, new File(mapredHome), "hadoop-mapreduce-client-common"); - addUrls(hadoopUrls, new File(mapredHome), "hadoop-mapreduce-client-core"); - } - catch (Exception e) { - throw new IgniteCheckedException(e); - } - - hadoopJars = hadoopUrls; - - return hadoopUrls; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ace354c6/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopComponent.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopComponent.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopComponent.java deleted file mode 100644 index 337bfe9..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopComponent.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import org.apache.ignite.*; - -/** - * Abstract class for all hadoop components. - */ -public abstract class GridHadoopComponent { - /** Hadoop context. */ - protected GridHadoopContext ctx; - - /** Logger. */ - protected IgniteLogger log; - - /** - * @param ctx Hadoop context. - */ - public void start(GridHadoopContext ctx) throws IgniteCheckedException { - this.ctx = ctx; - - log = ctx.kernalContext().log(getClass()); - } - - /** - * Stops manager. - */ - public void stop(boolean cancel) { - // No-op. - } - - /** - * Callback invoked when all grid components are started. - */ - public void onKernalStart() throws IgniteCheckedException { - // No-op. - } - - /** - * Callback invoked before all grid components are stopped. - */ - public void onKernalStop(boolean cancel) { - // No-op. - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ace354c6/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopContext.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopContext.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopContext.java deleted file mode 100644 index 3160e3d..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopContext.java +++ /dev/null @@ -1,196 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import org.apache.ignite.cluster.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.processors.hadoop.jobtracker.*; -import org.apache.ignite.internal.processors.hadoop.shuffle.*; -import org.apache.ignite.internal.processors.hadoop.taskexecutor.*; -import org.apache.ignite.internal.util.typedef.internal.*; - -import java.util.*; - -/** - * Hadoop accelerator context. - */ -public class GridHadoopContext { - /** Kernal context. */ - private GridKernalContext ctx; - - /** Hadoop configuration. */ - private GridHadoopConfiguration cfg; - - /** Job tracker. */ - private GridHadoopJobTracker jobTracker; - - /** External task executor. */ - private GridHadoopTaskExecutorAdapter taskExecutor; - - /** */ - private GridHadoopShuffle shuffle; - - /** Managers list. */ - private List<GridHadoopComponent> components = new ArrayList<>(); - - /** - * @param ctx Kernal context. - */ - public GridHadoopContext( - GridKernalContext ctx, - GridHadoopConfiguration cfg, - GridHadoopJobTracker jobTracker, - GridHadoopTaskExecutorAdapter taskExecutor, - GridHadoopShuffle shuffle - ) { - this.ctx = ctx; - this.cfg = cfg; - - this.jobTracker = add(jobTracker); - this.taskExecutor = add(taskExecutor); - this.shuffle = add(shuffle); - } - - /** - * Gets list of managers. - * - * @return List of managers. - */ - public List<GridHadoopComponent> components() { - return components; - } - - /** - * Gets kernal context. - * - * @return Grid kernal context instance. - */ - public GridKernalContext kernalContext() { - return ctx; - } - - /** - * Gets Hadoop configuration. - * - * @return Hadoop configuration. - */ - public GridHadoopConfiguration configuration() { - return cfg; - } - - /** - * Gets local node ID. Shortcut for {@code kernalContext().localNodeId()}. - * - * @return Local node ID. - */ - public UUID localNodeId() { - return ctx.localNodeId(); - } - - /** - * Gets local node order. - * - * @return Local node order. - */ - public long localNodeOrder() { - assert ctx.discovery() != null; - - return ctx.discovery().localNode().order(); - } - - /** - * @return Hadoop-enabled nodes. - */ - public Collection<ClusterNode> nodes() { - return ctx.discovery().cacheNodes(CU.SYS_CACHE_HADOOP_MR, ctx.discovery().topologyVersion()); - } - - /** - * @return {@code True} if - */ - public boolean jobUpdateLeader() { - long minOrder = Long.MAX_VALUE; - ClusterNode minOrderNode = null; - - for (ClusterNode node : nodes()) { - if (node.order() < minOrder) { - minOrder = node.order(); - minOrderNode = node; - } - } - - assert minOrderNode != null; - - return localNodeId().equals(minOrderNode.id()); - } - - /** - * @param meta Job metadata. - * @return {@code true} If local node is participating in job execution. - */ - public boolean isParticipating(GridHadoopJobMetadata meta) { - UUID locNodeId = localNodeId(); - - if (locNodeId.equals(meta.submitNodeId())) - return true; - - GridHadoopMapReducePlan plan = meta.mapReducePlan(); - - return plan.mapperNodeIds().contains(locNodeId) || plan.reducerNodeIds().contains(locNodeId) || jobUpdateLeader(); - } - - /** - * @return Jon tracker instance. - */ - public GridHadoopJobTracker jobTracker() { - return jobTracker; - } - - /** - * @return Task executor. - */ - public GridHadoopTaskExecutorAdapter taskExecutor() { - return taskExecutor; - } - - /** - * @return Shuffle. - */ - public GridHadoopShuffle shuffle() { - return shuffle; - } - - /** - * @return Map-reduce planner. - */ - public GridHadoopMapReducePlanner planner() { - return cfg.getMapReducePlanner(); - } - - /** - * Adds component. - * - * @param c Component to add. - * @return Added manager. - */ - private <C extends GridHadoopComponent> C add(C c) { - components.add(c); - - return c; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ace354c6/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopDefaultJobInfo.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopDefaultJobInfo.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopDefaultJobInfo.java deleted file mode 100644 index 555c573..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopDefaultJobInfo.java +++ /dev/null @@ -1,163 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import org.apache.ignite.*; -import org.apache.ignite.internal.processors.hadoop.v2.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.lang.reflect.*; -import java.util.*; - -/** - * Hadoop job info based on default Hadoop configuration. - */ -public class GridHadoopDefaultJobInfo implements GridHadoopJobInfo, Externalizable { - /** */ - private static final long serialVersionUID = 5489900236464999951L; - - /** {@code true} If job has combiner. */ - private boolean hasCombiner; - - /** Number of reducers configured for job. */ - private int numReduces; - - /** Configuration. */ - private Map<String,String> props = new HashMap<>(); - - /** Job name. */ - private String jobName; - - /** User name. */ - private String user; - - /** */ - private static volatile Class<?> jobCls; - - /** - * Default constructor required by {@link Externalizable}. - */ - public GridHadoopDefaultJobInfo() { - // No-op. - } - - /** - * Constructor. - * - * @param jobName Job name. - * @param user User name. - * @param hasCombiner {@code true} If job has combiner. - * @param numReduces Number of reducers configured for job. - * @param props All other properties of the job. - */ - public GridHadoopDefaultJobInfo(String jobName, String user, boolean hasCombiner, int numReduces, - Map<String, String> props) { - this.jobName = jobName; - this.user = user; - this.hasCombiner = hasCombiner; - this.numReduces = numReduces; - this.props = props; - } - - /** {@inheritDoc} */ - @Nullable @Override public String property(String name) { - return props.get(name); - } - - /** {@inheritDoc} */ - @Override public GridHadoopJob createJob(GridHadoopJobId jobId, IgniteLogger log) throws IgniteCheckedException { - try { - Class<?> jobCls0 = jobCls; - - if (jobCls0 == null) { // It is enough to have only one class loader with only Hadoop classes. - synchronized (GridHadoopDefaultJobInfo.class) { - if ((jobCls0 = jobCls) == null) { - GridHadoopClassLoader ldr = new GridHadoopClassLoader(null); - - jobCls = jobCls0 = ldr.loadClass(GridHadoopV2Job.class.getName()); - } - } - } - - Constructor<?> constructor = jobCls0.getConstructor(GridHadoopJobId.class, GridHadoopDefaultJobInfo.class, - IgniteLogger.class); - - return (GridHadoopJob)constructor.newInstance(jobId, this, log); - } - // NB: java.lang.NoClassDefFoundError may be thrown from Class#getConstructor() call. - catch (Throwable t) { - throw new IgniteCheckedException(t); - } - } - - /** {@inheritDoc} */ - @Override public boolean hasCombiner() { - return hasCombiner; - } - - /** {@inheritDoc} */ - @Override public boolean hasReducer() { - return reducers() > 0; - } - - /** {@inheritDoc} */ - @Override public int reducers() { - return numReduces; - } - - /** {@inheritDoc} */ - @Override public String jobName() { - return jobName; - } - - /** {@inheritDoc} */ - @Override public String user() { - return user; - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - U.writeString(out, jobName); - U.writeString(out, user); - - out.writeBoolean(hasCombiner); - out.writeInt(numReduces); - - U.writeStringMap(out, props); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - jobName = U.readString(in); - user = U.readString(in); - - hasCombiner = in.readBoolean(); - numReduces = in.readInt(); - - props = U.readStringMap(in); - } - - /** - * @return Properties of the job. - */ - public Map<String, String> properties() { - return props; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ace354c6/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopImpl.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopImpl.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopImpl.java deleted file mode 100644 index 55e3690..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopImpl.java +++ /dev/null @@ -1,132 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import org.apache.ignite.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.util.*; -import org.jetbrains.annotations.*; - -/** - * Hadoop facade implementation. - */ -public class GridHadoopImpl implements GridHadoop { - /** Hadoop processor. */ - private final IgniteHadoopProcessor proc; - - /** Busy lock. */ - private final GridSpinBusyLock busyLock = new GridSpinBusyLock(); - - /** - * Constructor. - * - * @param proc Hadoop processor. - */ - GridHadoopImpl(IgniteHadoopProcessor proc) { - this.proc = proc; - } - - /** {@inheritDoc} */ - @Override public GridHadoopConfiguration configuration() { - return proc.config(); - } - - /** {@inheritDoc} */ - @Override public GridHadoopJobId nextJobId() { - if (busyLock.enterBusy()) { - try { - return proc.nextJobId(); - } - finally { - busyLock.leaveBusy(); - } - } - else - throw new IllegalStateException("Failed to get next job ID (grid is stopping)."); - } - - /** {@inheritDoc} */ - @Override public IgniteInternalFuture<?> submit(GridHadoopJobId jobId, GridHadoopJobInfo jobInfo) { - if (busyLock.enterBusy()) { - try { - return proc.submit(jobId, jobInfo); - } - finally { - busyLock.leaveBusy(); - } - } - else - throw new IllegalStateException("Failed to submit job (grid is stopping)."); - } - - /** {@inheritDoc} */ - @Nullable @Override public GridHadoopJobStatus status(GridHadoopJobId jobId) throws IgniteCheckedException { - if (busyLock.enterBusy()) { - try { - return proc.status(jobId); - } - finally { - busyLock.leaveBusy(); - } - } - else - throw new IllegalStateException("Failed to get job status (grid is stopping)."); - } - - /** {@inheritDoc} */ - @Nullable @Override public GridHadoopCounters counters(GridHadoopJobId jobId) throws IgniteCheckedException { - if (busyLock.enterBusy()) { - try { - return proc.counters(jobId); - } - finally { - busyLock.leaveBusy(); - } - } - else - throw new IllegalStateException("Failed to get job counters (grid is stopping)."); - } - - /** {@inheritDoc} */ - @Nullable @Override public IgniteInternalFuture<?> finishFuture(GridHadoopJobId jobId) throws IgniteCheckedException { - if (busyLock.enterBusy()) { - try { - return proc.finishFuture(jobId); - } - finally { - busyLock.leaveBusy(); - } - } - else - throw new IllegalStateException("Failed to get job finish future (grid is stopping)."); - } - - /** {@inheritDoc} */ - @Override public boolean kill(GridHadoopJobId jobId) throws IgniteCheckedException { - if (busyLock.enterBusy()) { - try { - return proc.kill(jobId); - } - finally { - busyLock.leaveBusy(); - } - } - else - throw new IllegalStateException("Failed to kill job (grid is stopping)."); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ace354c6/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopSetup.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopSetup.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopSetup.java deleted file mode 100644 index 66b1db4..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopSetup.java +++ /dev/null @@ -1,505 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; - -import java.io.*; -import java.net.*; -import java.nio.file.*; -import java.text.*; -import java.util.*; - -import static org.apache.ignite.internal.IgniteVersionUtils.*; - -/** - * Setup tool to configure Hadoop client. - */ -public class GridHadoopSetup { - /** */ - public static final String WINUTILS_EXE = "winutils.exe"; - - /** */ - private static final FilenameFilter IGNITE_JARS = new FilenameFilter() { - @Override public boolean accept(File dir, String name) { - return name.startsWith("ignite-") && name.endsWith(".jar"); - } - }; - - /** - * The main method. - * @param ignore Params. - */ - public static void main(String[] ignore) { - X.println( - " __________ ________________ ", - " / _/ ___/ |/ / _/_ __/ __/ ", - " _/ // (_ / // / / / / _/ ", - "/___/\\___/_/|_/___/ /_/ /___/ ", - " for Apache Hadoop ", - " "); - - println("Version " + ACK_VER_STR); - - configureHadoop(); - } - - /** - * This operation prepares the clean unpacked Hadoop distributive to work as client with Ignite-Hadoop. - * It performs these operations: - * <ul> - * <li>Check for setting of HADOOP_HOME environment variable.</li> - * <li>Try to resolve HADOOP_COMMON_HOME or evaluate it relative to HADOOP_HOME.</li> - * <li>In Windows check if winutils.exe exists and try to fix issue with some restrictions.</li> - * <li>In Windows check new line character issues in CMD scripts.</li> - * <li>Scan Hadoop lib directory to detect Ignite JARs. If these don't exist tries to create ones.</li> - * </ul> - */ - private static void configureHadoop() { - String igniteHome = U.getIgniteHome(); - - println("IGNITE_HOME is set to '" + igniteHome + "'."); - - checkIgniteHome(igniteHome); - - String homeVar = "HADOOP_HOME"; - String hadoopHome = System.getenv(homeVar); - - if (F.isEmpty(hadoopHome)) { - homeVar = "HADOOP_PREFIX"; - hadoopHome = System.getenv(homeVar); - } - - if (F.isEmpty(hadoopHome)) - exit("Neither HADOOP_HOME nor HADOOP_PREFIX environment variable is set. Please set one of them to a " + - "valid Hadoop installation directory and run setup tool again.", null); - - hadoopHome = hadoopHome.replaceAll("\"", ""); - - println(homeVar + " is set to '" + hadoopHome + "'."); - - String hiveHome = System.getenv("HIVE_HOME"); - - if (!F.isEmpty(hiveHome)) { - hiveHome = hiveHome.replaceAll("\"", ""); - - println("HIVE_HOME is set to '" + hiveHome + "'."); - } - - File hadoopDir = new File(hadoopHome); - - if (!hadoopDir.exists()) - exit("Hadoop installation folder does not exist.", null); - - if (!hadoopDir.isDirectory()) - exit("HADOOP_HOME must point to a directory.", null); - - if (!hadoopDir.canRead()) - exit("Hadoop installation folder can not be read. Please check permissions.", null); - - File hadoopCommonDir; - - String hadoopCommonHome = System.getenv("HADOOP_COMMON_HOME"); - - if (F.isEmpty(hadoopCommonHome)) { - hadoopCommonDir = new File(hadoopDir, "share/hadoop/common"); - - println("HADOOP_COMMON_HOME is not set, will use '" + hadoopCommonDir.getPath() + "'."); - } - else { - println("HADOOP_COMMON_HOME is set to '" + hadoopCommonHome + "'."); - - hadoopCommonDir = new File(hadoopCommonHome); - } - - if (!hadoopCommonDir.canRead()) - exit("Failed to read Hadoop common dir in '" + hadoopCommonHome + "'.", null); - - File hadoopCommonLibDir = new File(hadoopCommonDir, "lib"); - - if (!hadoopCommonLibDir.canRead()) - exit("Failed to read Hadoop 'lib' folder in '" + hadoopCommonLibDir.getPath() + "'.", null); - - if (U.isWindows()) { - checkJavaPathSpaces(); - - File hadoopBinDir = new File(hadoopDir, "bin"); - - if (!hadoopBinDir.canRead()) - exit("Failed to read subdirectory 'bin' in HADOOP_HOME.", null); - - File winutilsFile = new File(hadoopBinDir, WINUTILS_EXE); - - if (!winutilsFile.exists()) { - if (ask("File '" + WINUTILS_EXE + "' does not exist. " + - "It may be replaced by a stub. Create it?")) { - println("Creating file stub '" + winutilsFile.getAbsolutePath() + "'."); - - boolean ok = false; - - try { - ok = winutilsFile.createNewFile(); - } - catch (IOException ignore) { - // No-op. - } - - if (!ok) - exit("Failed to create '" + WINUTILS_EXE + "' file. Please check permissions.", null); - } - else - println("Ok. But Hadoop client probably will not work on Windows this way..."); - } - - processCmdFiles(hadoopDir, "bin", "sbin", "libexec"); - } - - File igniteLibs = new File(new File(igniteHome), "libs"); - - if (!igniteLibs.exists()) - exit("Ignite 'libs' folder is not found.", null); - - Collection<File> jarFiles = new ArrayList<>(); - - addJarsInFolder(jarFiles, igniteLibs); - addJarsInFolder(jarFiles, new File(igniteLibs, "ignite-hadoop")); - - boolean jarsLinksCorrect = true; - - for (File file : jarFiles) { - File link = new File(hadoopCommonLibDir, file.getName()); - - jarsLinksCorrect &= isJarLinkCorrect(link, file); - - if (!jarsLinksCorrect) - break; - } - - if (!jarsLinksCorrect) { - if (ask("Ignite JAR files are not found in Hadoop 'lib' directory. " + - "Create appropriate symbolic links?")) { - File[] oldIgniteJarFiles = hadoopCommonLibDir.listFiles(IGNITE_JARS); - - if (oldIgniteJarFiles.length > 0 && ask("The Hadoop 'lib' directory contains JARs from other Ignite " + - "installation. They must be deleted to continue. Continue?")) { - for (File file : oldIgniteJarFiles) { - println("Deleting file '" + file.getAbsolutePath() + "'."); - - if (!file.delete()) - exit("Failed to delete file '" + file.getPath() + "'.", null); - } - } - - for (File file : jarFiles) { - File targetFile = new File(hadoopCommonLibDir, file.getName()); - - try { - println("Creating symbolic link '" + targetFile.getAbsolutePath() + "'."); - - Files.createSymbolicLink(targetFile.toPath(), file.toPath()); - } - catch (IOException e) { - if (U.isWindows()) { - warn("Ability to create symbolic links is required!"); - warn("On Windows platform you have to grant permission 'Create symbolic links'"); - warn("to your user or run the Accelerator as Administrator."); - } - - exit("Creating symbolic link failed! Check permissions.", e); - } - } - } - else - println("Ok. But Hadoop client will not be able to talk to Ignite cluster without those JARs in classpath..."); - } - - File hadoopEtc = new File(hadoopDir, "etc" + File.separator + "hadoop"); - - File igniteDocs = new File(igniteHome, "docs"); - - if (!igniteDocs.canRead()) - exit("Failed to read Ignite 'docs' folder at '" + igniteDocs.getAbsolutePath() + "'.", null); - - if (hadoopEtc.canWrite()) { // TODO Bigtop - if (ask("Replace 'core-site.xml' and 'mapred-site.xml' files with preconfigured templates " + - "(existing files will be backed up)?")) { - replaceWithBackup(new File(igniteDocs, "core-site.ignite.xml"), new File(hadoopEtc, "core-site.xml")); - - replaceWithBackup(new File(igniteDocs, "mapred-site.ignite.xml"), new File(hadoopEtc, "mapred-site.xml")); - } - else - println("Ok. You can configure them later, the templates are available at Ignite's 'docs' directory..."); - } - - if (!F.isEmpty(hiveHome)) { - File hiveConfDir = new File(hiveHome + File.separator + "conf"); - - if (!hiveConfDir.canWrite()) - warn("Can not write to '" + hiveConfDir.getAbsolutePath() + "'. To run Hive queries you have to " + - "configure 'hive-site.xml' manually. The template is available at Ignite's 'docs' directory."); - else if (ask("Replace 'hive-site.xml' with preconfigured template (existing file will be backed up)?")) - replaceWithBackup(new File(igniteDocs, "hive-site.ignite.xml"), new File(hiveConfDir, "hive-site.xml")); - else - println("Ok. You can configure it later, the template is available at Ignite's 'docs' directory..."); - } - - println("Apache Hadoop setup is complete."); - } - - /** - * @param jarFiles Jars. - * @param folder Folder. - */ - private static void addJarsInFolder(Collection<File> jarFiles, File folder) { - if (!folder.exists()) - exit("Folder '" + folder.getAbsolutePath() + "' is not found.", null); - - jarFiles.addAll(Arrays.asList(folder.listFiles(IGNITE_JARS))); - } - - /** - * Checks that JAVA_HOME does not contain space characters. - */ - private static void checkJavaPathSpaces() { - String javaHome = System.getProperty("java.home"); - - if (javaHome.contains(" ")) { - warn("Java installation path contains space characters!"); - warn("Hadoop client will not be able to start using '" + javaHome + "'."); - warn("Please install JRE to path which does not contain spaces and point JAVA_HOME to that installation."); - } - } - - /** - * Checks Ignite home. - * - * @param igniteHome Ignite home. - */ - private static void checkIgniteHome(String igniteHome) { - URL jarUrl = U.class.getProtectionDomain().getCodeSource().getLocation(); - - try { - Path jar = Paths.get(jarUrl.toURI()); - Path igHome = Paths.get(igniteHome); - - if (!jar.startsWith(igHome)) - exit("Ignite JAR files are not under IGNITE_HOME.", null); - } - catch (Exception e) { - exit(e.getMessage(), e); - } - } - - /** - * Replaces target file with source file. - * - * @param from From. - * @param to To. - */ - private static void replaceWithBackup(File from, File to) { - if (!from.canRead()) - exit("Failed to read source file '" + from.getAbsolutePath() + "'.", null); - - println("Replacing file '" + to.getAbsolutePath() + "'."); - - try { - U.copy(from, renameToBak(to), true); - } - catch (IOException e) { - exit("Failed to replace file '" + to.getAbsolutePath() + "'.", e); - } - } - - /** - * Renames file for backup. - * - * @param file File. - * @return File. - */ - private static File renameToBak(File file) { - DateFormat fmt = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss"); - - if (file.exists() && !file.renameTo(new File(file.getAbsolutePath() + "." + fmt.format(new Date()) + ".bak"))) - exit("Failed to rename file '" + file.getPath() + "'.", null); - - return file; - } - - /** - * Checks if link is correct. - * - * @param link Symbolic link. - * @param correctTarget Correct link target. - * @return {@code true} If link target is correct. - */ - private static boolean isJarLinkCorrect(File link, File correctTarget) { - if (!Files.isSymbolicLink(link.toPath())) - return false; // It is a real file or it does not exist. - - Path target = null; - - try { - target = Files.readSymbolicLink(link.toPath()); - } - catch (IOException e) { - exit("Failed to read symbolic link: " + link.getAbsolutePath(), e); - } - - return Files.exists(target) && target.toFile().equals(correctTarget); - } - - /** - * Writes the question end read the boolean answer from the console. - * - * @param question Question to write. - * @return {@code true} if user inputs 'Y' or 'y', {@code false} otherwise. - */ - private static boolean ask(String question) { - X.println(); - X.print(" < " + question + " (Y/N): "); - - String answer = null; - - if (!F.isEmpty(System.getenv("IGNITE_HADOOP_SETUP_YES"))) - answer = "Y"; - else { - BufferedReader br = new BufferedReader(new InputStreamReader(System.in)); - - try { - answer = br.readLine(); - } - catch (IOException e) { - exit("Failed to read answer: " + e.getMessage(), e); - } - } - - if (answer != null && "Y".equals(answer.toUpperCase().trim())) { - X.println(" > Yes."); - - return true; - } - else { - X.println(" > No."); - - return false; - } - } - - /** - * Exit with message. - * - * @param msg Exit message. - */ - private static void exit(String msg, Exception e) { - X.println(" "); - X.println(" # " + msg); - X.println(" # Setup failed, exiting... "); - - if (e != null && !F.isEmpty(System.getenv("IGNITE_HADOOP_SETUP_DEBUG"))) - e.printStackTrace(); - - System.exit(1); - } - - /** - * Prints message. - * - * @param msg Message. - */ - private static void println(String msg) { - X.println(" > " + msg); - } - - /** - * Prints warning. - * - * @param msg Message. - */ - private static void warn(String msg) { - X.println(" ! " + msg); - } - - /** - * Checks that CMD files have valid MS Windows new line characters. If not, writes question to console and reads the - * answer. If it's 'Y' then backups original files and corrects invalid new line characters. - * - * @param rootDir Root directory to process. - * @param dirs Directories inside of the root to process. - */ - private static void processCmdFiles(File rootDir, String... dirs) { - boolean answer = false; - - for (String dir : dirs) { - File subDir = new File(rootDir, dir); - - File[] cmdFiles = subDir.listFiles(new FilenameFilter() { - @Override public boolean accept(File dir, String name) { - return name.toLowerCase().endsWith(".cmd"); - } - }); - - for (File file : cmdFiles) { - String content = null; - - try (Scanner scanner = new Scanner(file)) { - content = scanner.useDelimiter("\\Z").next(); - } - catch (FileNotFoundException e) { - exit("Failed to read file '" + file + "'.", e); - } - - boolean invalid = false; - - for (int i = 0; i < content.length(); i++) { - if (content.charAt(i) == '\n' && (i == 0 || content.charAt(i - 1) != '\r')) { - invalid = true; - - break; - } - } - - if (invalid) { - answer = answer || ask("One or more *.CMD files has invalid new line character. Replace them?"); - - if (!answer) { - println("Ok. But Windows most probably will fail to execute them..."); - - return; - } - - println("Fixing newline characters in file '" + file.getAbsolutePath() + "'."); - - renameToBak(file); - - try (BufferedWriter writer = new BufferedWriter(new FileWriter(file))) { - for (int i = 0; i < content.length(); i++) { - if (content.charAt(i) == '\n' && (i == 0 || content.charAt(i - 1) != '\r')) - writer.write("\r"); - - writer.write(content.charAt(i)); - } - } - catch (IOException e) { - exit("Failed to write file '" + file.getPath() + "': " + e.getMessage(), e); - } - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ace354c6/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTaskCancelledException.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTaskCancelledException.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTaskCancelledException.java deleted file mode 100644 index c762181..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTaskCancelledException.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import org.apache.ignite.*; - -/** - * Exception that throws when the task is cancelling. - */ -public class GridHadoopTaskCancelledException extends IgniteException { - /** */ - private static final long serialVersionUID = 0L; - - /** - * @param msg Exception message. - */ - public GridHadoopTaskCancelledException(String msg) { - super(msg); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ace354c6/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopUtils.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopUtils.java deleted file mode 100644 index 763f45a..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/GridHadoopUtils.java +++ /dev/null @@ -1,308 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import org.apache.hadoop.conf.*; -import org.apache.hadoop.fs.*; -import org.apache.hadoop.io.*; -import org.apache.hadoop.mapred.*; -import org.apache.hadoop.mapreduce.JobID; -import org.apache.hadoop.mapreduce.JobPriority; -import org.apache.hadoop.mapreduce.JobStatus; -import org.apache.hadoop.mapreduce.*; -import org.apache.ignite.*; -import org.apache.ignite.internal.processors.hadoop.v2.*; -import org.apache.ignite.internal.util.typedef.internal.*; - -import java.io.*; -import java.util.*; - -/** - * Hadoop utility methods. - */ -public class GridHadoopUtils { - /** Property to store timestamp of new job id request. */ - public static final String REQ_NEW_JOBID_TS_PROPERTY = "ignite.job.requestNewIdTs"; - - /** Property to store timestamp of response of new job id request. */ - public static final String RESPONSE_NEW_JOBID_TS_PROPERTY = "ignite.job.responseNewIdTs"; - - /** Property to store timestamp of job submission. */ - public static final String JOB_SUBMISSION_START_TS_PROPERTY = "ignite.job.submissionStartTs"; - - /** Property to set custom writer of job statistics. */ - public static final String JOB_COUNTER_WRITER_PROPERTY = "ignite.counters.writer"; - - /** Staging constant. */ - private static final String STAGING_CONSTANT = ".staging"; - - /** Old mapper class attribute. */ - private static final String OLD_MAP_CLASS_ATTR = "mapred.mapper.class"; - - /** Old reducer class attribute. */ - private static final String OLD_REDUCE_CLASS_ATTR = "mapred.reducer.class"; - - /** - * Wraps native split. - * - * @param id Split ID. - * @param split Split. - * @param hosts Hosts. - * @throws IOException If failed. - */ - public static GridHadoopSplitWrapper wrapSplit(int id, Object split, String[] hosts) throws IOException { - ByteArrayOutputStream arr = new ByteArrayOutputStream(); - ObjectOutput out = new ObjectOutputStream(arr); - - assert split instanceof Writable; - - ((Writable)split).write(out); - - out.flush(); - - return new GridHadoopSplitWrapper(id, split.getClass().getName(), arr.toByteArray(), hosts); - } - - /** - * Unwraps native split. - * - * @param o Wrapper. - * @return Split. - */ - public static Object unwrapSplit(GridHadoopSplitWrapper o) { - try { - Writable w = (Writable)GridHadoopUtils.class.getClassLoader().loadClass(o.className()).newInstance(); - - w.readFields(new ObjectInputStream(new ByteArrayInputStream(o.bytes()))); - - return w; - } - catch (Exception e) { - throw new IllegalStateException(e); - } - } - - /** - * Convert Ignite job status to Hadoop job status. - * - * @param status Ignite job status. - * @return Hadoop job status. - */ - public static JobStatus status(GridHadoopJobStatus status, Configuration conf) { - JobID jobId = new JobID(status.jobId().globalId().toString(), status.jobId().localId()); - - float setupProgress = 0; - float mapProgress = 0; - float reduceProgress = 0; - float cleanupProgress = 0; - - JobStatus.State state = JobStatus.State.RUNNING; - - switch (status.jobPhase()) { - case PHASE_SETUP: - setupProgress = 0.42f; - - break; - - case PHASE_MAP: - setupProgress = 1; - mapProgress = 1f - status.pendingMapperCnt() / (float)status.totalMapperCnt(); - - break; - - case PHASE_REDUCE: - assert status.totalReducerCnt() > 0; - - setupProgress = 1; - mapProgress = 1; - reduceProgress = 1f - status.pendingReducerCnt() / (float)status.totalReducerCnt(); - - break; - - case PHASE_CANCELLING: - case PHASE_COMPLETE: - if (!status.isFailed()) { - setupProgress = 1; - mapProgress = 1; - reduceProgress = 1; - cleanupProgress = 1; - - state = JobStatus.State.SUCCEEDED; - } - else - state = JobStatus.State.FAILED; - - break; - - default: - assert false; - } - - return new JobStatus(jobId, setupProgress, mapProgress, reduceProgress, cleanupProgress, state, - JobPriority.NORMAL, status.user(), status.jobName(), jobFile(conf, status.user(), jobId).toString(), "N/A"); - } - - /** - * Gets staging area directory. - * - * @param conf Configuration. - * @param usr User. - * @return Staging area directory. - */ - public static Path stagingAreaDir(Configuration conf, String usr) { - return new Path(conf.get(MRJobConfig.MR_AM_STAGING_DIR, MRJobConfig.DEFAULT_MR_AM_STAGING_DIR) - + Path.SEPARATOR + usr + Path.SEPARATOR + STAGING_CONSTANT); - } - - /** - * Gets job file. - * - * @param conf Configuration. - * @param usr User. - * @param jobId Job ID. - * @return Job file. - */ - public static Path jobFile(Configuration conf, String usr, JobID jobId) { - return new Path(stagingAreaDir(conf, usr), jobId.toString() + Path.SEPARATOR + MRJobConfig.JOB_CONF_FILE); - } - - /** - * Checks the attribute in configuration is not set. - * - * @param attr Attribute name. - * @param msg Message for creation of exception. - * @throws IgniteCheckedException If attribute is set. - */ - public static void ensureNotSet(Configuration cfg, String attr, String msg) throws IgniteCheckedException { - if (cfg.get(attr) != null) - throw new IgniteCheckedException(attr + " is incompatible with " + msg + " mode."); - } - - /** - * Creates JobInfo from hadoop configuration. - * - * @param cfg Hadoop configuration. - * @return Job info. - * @throws IgniteCheckedException If failed. - */ - public static GridHadoopDefaultJobInfo createJobInfo(Configuration cfg) throws IgniteCheckedException { - JobConf jobConf = new JobConf(cfg); - - boolean hasCombiner = jobConf.get("mapred.combiner.class") != null - || jobConf.get(MRJobConfig.COMBINE_CLASS_ATTR) != null; - - int numReduces = jobConf.getNumReduceTasks(); - - jobConf.setBooleanIfUnset("mapred.mapper.new-api", jobConf.get(OLD_MAP_CLASS_ATTR) == null); - - if (jobConf.getUseNewMapper()) { - String mode = "new map API"; - - ensureNotSet(jobConf, "mapred.input.format.class", mode); - ensureNotSet(jobConf, OLD_MAP_CLASS_ATTR, mode); - - if (numReduces != 0) - ensureNotSet(jobConf, "mapred.partitioner.class", mode); - else - ensureNotSet(jobConf, "mapred.output.format.class", mode); - } - else { - String mode = "map compatibility"; - - ensureNotSet(jobConf, MRJobConfig.INPUT_FORMAT_CLASS_ATTR, mode); - ensureNotSet(jobConf, MRJobConfig.MAP_CLASS_ATTR, mode); - - if (numReduces != 0) - ensureNotSet(jobConf, MRJobConfig.PARTITIONER_CLASS_ATTR, mode); - else - ensureNotSet(jobConf, MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR, mode); - } - - if (numReduces != 0) { - jobConf.setBooleanIfUnset("mapred.reducer.new-api", jobConf.get(OLD_REDUCE_CLASS_ATTR) == null); - - if (jobConf.getUseNewReducer()) { - String mode = "new reduce API"; - - ensureNotSet(jobConf, "mapred.output.format.class", mode); - ensureNotSet(jobConf, OLD_REDUCE_CLASS_ATTR, mode); - } - else { - String mode = "reduce compatibility"; - - ensureNotSet(jobConf, MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR, mode); - ensureNotSet(jobConf, MRJobConfig.REDUCE_CLASS_ATTR, mode); - } - } - - Map<String, String> props = new HashMap<>(); - - for (Map.Entry<String, String> entry : jobConf) - props.put(entry.getKey(), entry.getValue()); - - return new GridHadoopDefaultJobInfo(jobConf.getJobName(), jobConf.getUser(), hasCombiner, numReduces, props); - } - - /** - * Throws new {@link IgniteCheckedException} with original exception is serialized into string. - * This is needed to transfer error outside the current class loader. - * - * @param e Original exception. - * @return IgniteCheckedException New exception. - */ - public static IgniteCheckedException transformException(Throwable e) { - ByteArrayOutputStream os = new ByteArrayOutputStream(); - - e.printStackTrace(new PrintStream(os, true)); - - return new IgniteCheckedException(os.toString()); - } - - /** - * Returns work directory for job execution. - * - * @param locNodeId Local node ID. - * @param jobId Job ID. - * @return Working directory for job. - * @throws IgniteCheckedException If Failed. - */ - public static File jobLocalDir(UUID locNodeId, GridHadoopJobId jobId) throws IgniteCheckedException { - return new File(new File(U.resolveWorkDirectory("hadoop", false), "node-" + locNodeId), "job_" + jobId); - } - - /** - * Returns subdirectory of job working directory for task execution. - * - * @param locNodeId Local node ID. - * @param info Task info. - * @return Working directory for task. - * @throws IgniteCheckedException If Failed. - */ - public static File taskLocalDir(UUID locNodeId, GridHadoopTaskInfo info) throws IgniteCheckedException { - File jobLocDir = jobLocalDir(locNodeId, info.jobId()); - - return new File(jobLocDir, info.type() + "_" + info.taskNumber() + "_" + info.attempt()); - } - - /** - * Constructor. - */ - private GridHadoopUtils() { - // No-op. - } -}