http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ace354c6/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java new file mode 100644 index 0000000..6915d17 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java @@ -0,0 +1,552 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.ignite.*; +import org.apache.ignite.internal.processors.hadoop.v2.*; +import org.apache.ignite.internal.util.typedef.*; +import org.jdk8.backport.*; +import org.jetbrains.annotations.*; +import org.objectweb.asm.*; +import org.objectweb.asm.commons.*; + +import java.io.*; +import java.net.*; +import java.util.*; +import java.util.concurrent.atomic.*; + +/** + * Class loader allowing explicitly load classes without delegation to parent class loader. + * Also supports class parsing for finding dependencies which contain transitive dependencies + * unavailable for parent. + */ +public class HadoopClassLoader extends URLClassLoader { + /** + * We are very parallel capable. + */ + static { + registerAsParallelCapable(); + } + + /** */ + private static final URLClassLoader APP_CLS_LDR = (URLClassLoader)HadoopClassLoader.class.getClassLoader(); + + /** */ + private static final Collection<URL> appJars = F.asList(APP_CLS_LDR.getURLs()); + + /** */ + private static volatile Collection<URL> hadoopJars; + + /** */ + private static final Map<String, Boolean> cache = new ConcurrentHashMap8<>(); + + /** */ + private static final Map<String, byte[]> bytesCache = new ConcurrentHashMap8<>(); + + /** + * @param urls Urls. + */ + public HadoopClassLoader(URL[] urls) { + super(addHadoopUrls(urls), APP_CLS_LDR); + + assert !(getParent() instanceof HadoopClassLoader); + } + + /** + * Need to parse only Ignite Hadoop and IGFS classes. + * + * @param cls Class name. + * @return {@code true} if we need to check this class. + */ + private static boolean isIgfsHadoop(String cls) { + String ignitePackagePrefix = "org.apache.ignite"; + int len = ignitePackagePrefix.length(); + + return cls.startsWith(ignitePackagePrefix) && (cls.indexOf("igfs.", len) != -1 || cls.indexOf(".fs.", len) != -1 || cls.indexOf("hadoop.", len) != -1); + } + + /** + * @param cls Class name. + * @return {@code true} If this is Hadoop class. + */ + private static boolean isHadoop(String cls) { + return cls.startsWith("org.apache.hadoop."); + } + + /** {@inheritDoc} */ + @Override protected Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException { + try { + if (isHadoop(name)) { // Always load Hadoop classes explicitly, since Hadoop can be available in App classpath. + if (name.endsWith(".util.ShutdownHookManager")) // Dirty hack to get rid of Hadoop shutdown hooks. + return loadFromBytes(name, GridHadoopShutdownHookManager.class.getName()); + else if (name.endsWith(".util.NativeCodeLoader")) + return loadFromBytes(name, GridHadoopNativeCodeLoader.class.getName()); + + return loadClassExplicitly(name, resolve); + } + + if (isIgfsHadoop(name)) { // For Ignite Hadoop and IGFS classes we have to check if they depend on Hadoop. + Boolean hasDeps = cache.get(name); + + if (hasDeps == null) { + hasDeps = hasExternalDependencies(name, new HashSet<String>()); + + cache.put(name, hasDeps); + } + + if (hasDeps) + return loadClassExplicitly(name, resolve); + } + + return super.loadClass(name, resolve); + } + catch (NoClassDefFoundError | ClassNotFoundException e) { + throw new ClassNotFoundException("Failed to load class: " + name, e); + } + } + + /** + * @param name Name. + * @param replace Replacement. + * @return Class. + */ + private Class<?> loadFromBytes(final String name, final String replace) { + synchronized (getClassLoadingLock(name)) { + // First, check if the class has already been loaded + Class c = findLoadedClass(name); + + if (c != null) + return c; + + byte[] bytes = bytesCache.get(name); + + if (bytes == null) { + InputStream in = loadClassBytes(getParent(), replace); + + ClassReader rdr; + + try { + rdr = new ClassReader(in); + } + catch (IOException e) { + throw new RuntimeException(e); + } + + ClassWriter w = new ClassWriter(Opcodes.ASM4); + + rdr.accept(new RemappingClassAdapter(w, new Remapper() { + /** */ + String replaceType = replace.replace('.', '/'); + + /** */ + String nameType = name.replace('.', '/'); + + @Override public String map(String type) { + if (type.equals(replaceType)) + return nameType; + + return type; + } + }), ClassReader.EXPAND_FRAMES); + + bytes = w.toByteArray(); + + bytesCache.put(name, bytes); + } + + return defineClass(name, bytes, 0, bytes.length); + } + } + + /** + * @param name Class name. + * @param resolve Resolve class. + * @return Class. + * @throws ClassNotFoundException If failed. + */ + private Class<?> loadClassExplicitly(String name, boolean resolve) throws ClassNotFoundException { + synchronized (getClassLoadingLock(name)) { + // First, check if the class has already been loaded + Class c = findLoadedClass(name); + + if (c == null) { + long t1 = System.nanoTime(); + + c = findClass(name); + + // this is the defining class loader; record the stats + sun.misc.PerfCounter.getFindClassTime().addElapsedTimeFrom(t1); + sun.misc.PerfCounter.getFindClasses().increment(); + } + + if (resolve) + resolveClass(c); + + return c; + } + } + + /** + * @param ldr Loader. + * @param clsName Class. + * @return Input stream. + */ + @Nullable private InputStream loadClassBytes(ClassLoader ldr, String clsName) { + return ldr.getResourceAsStream(clsName.replace('.', '/') + ".class"); + } + + /** + * @param clsName Class name. + * @return {@code true} If the class has external dependencies. + */ + boolean hasExternalDependencies(final String clsName, final Set<String> visited) { + if (isHadoop(clsName)) // Hadoop must not be in classpath but Idea sucks, so filtering explicitly as external. + return true; + + // Try to get from parent to check if the type accessible. + InputStream in = loadClassBytes(getParent(), clsName); + + if (in == null) // The class is external itself, it must be loaded from this class loader. + return true; + + if (!isIgfsHadoop(clsName)) // Other classes should not have external dependencies. + return false; + + final ClassReader rdr; + + try { + rdr = new ClassReader(in); + } + catch (IOException e) { + throw new RuntimeException("Failed to read class: " + clsName, e); + } + + visited.add(clsName); + + final AtomicBoolean hasDeps = new AtomicBoolean(); + + rdr.accept(new ClassVisitor(Opcodes.ASM4) { + AnnotationVisitor av = new AnnotationVisitor(Opcodes.ASM4) { + // TODO + }; + + FieldVisitor fv = new FieldVisitor(Opcodes.ASM4) { + @Override public AnnotationVisitor visitAnnotation(String desc, boolean b) { + onType(desc); + + return av; + } + }; + + MethodVisitor mv = new MethodVisitor(Opcodes.ASM4) { + @Override public AnnotationVisitor visitAnnotation(String desc, boolean b) { + onType(desc); + + return av; + } + + @Override public AnnotationVisitor visitParameterAnnotation(int i, String desc, boolean b) { + onType(desc); + + return av; + } + + @Override public AnnotationVisitor visitAnnotationDefault() { + return av; + } + + @Override public void visitFieldInsn(int i, String owner, String name, String desc) { + onType(owner); + onType(desc); + } + + @Override public void visitFrame(int i, int i2, Object[] locTypes, int i3, Object[] stackTypes) { + for (Object o : locTypes) { + if (o instanceof String) + onType((String)o); + } + + for (Object o : stackTypes) { + if (o instanceof String) + onType((String)o); + } + } + + @Override public void visitLocalVariable(String name, String desc, String signature, Label lb, + Label lb2, int i) { + onType(desc); + } + + @Override public void visitMethodInsn(int i, String owner, String name, String desc) { + onType(owner); + } + + @Override public void visitMultiANewArrayInsn(String desc, int dim) { + onType(desc); + } + + @Override public void visitTryCatchBlock(Label lb, Label lb2, Label lb3, String e) { + onType(e); + } + }; + + void onClass(String depCls) { + assert validateClassName(depCls) : depCls; + + if (depCls.startsWith("java.")) // Filter out platform classes. + return; + + if (visited.contains(depCls)) + return; + + Boolean res = cache.get(depCls); + + if (res == Boolean.TRUE || (res == null && hasExternalDependencies(depCls, visited))) + hasDeps.set(true); + } + + void onType(String type) { + if (type == null) + return; + + int off = 0; + + while (type.charAt(off) == '[') + off++; // Handle arrays. + + if (off != 0) + type = type.substring(off); + + if (type.length() == 1) + return; // Get rid of primitives. + + if (type.charAt(type.length() - 1) == ';') { + assert type.charAt(0) == 'L' : type; + + type = type.substring(1, type.length() - 1); + } + + type = type.replace('/', '.'); + + onClass(type); + } + + @Override public void visit(int i, int i2, String name, String signature, String superName, + String[] ifaces) { + onType(superName); + + if (ifaces != null) { + for (String iface : ifaces) + onType(iface); + } + } + + @Override public AnnotationVisitor visitAnnotation(String desc, boolean visible) { + onType(desc); + + return av; + } + + @Override public void visitInnerClass(String name, String outerName, String innerName, int i) { + onType(name); + } + + @Override public FieldVisitor visitField(int i, String name, String desc, String signature, Object val) { + onType(desc); + + return fv; + } + + @Override public MethodVisitor visitMethod(int i, String name, String desc, String signature, + String[] exceptions) { + if (exceptions != null) { + for (String e : exceptions) + onType(e); + } + + return mv; + } + }, 0); + + if (hasDeps.get()) // We already know that we have dependencies, no need to check parent. + return true; + + // Here we are known to not have any dependencies but possibly we have a parent which have them. + int idx = clsName.lastIndexOf('$'); + + if (idx == -1) // No parent class. + return false; + + String parentCls = clsName.substring(0, idx); + + if (visited.contains(parentCls)) + return false; + + Boolean res = cache.get(parentCls); + + if (res == null) + res = hasExternalDependencies(parentCls, visited); + + return res; + } + + /** + * @param name Class name. + * @return {@code true} If this is a valid class name. + */ + private static boolean validateClassName(String name) { + int len = name.length(); + + if (len <= 1) + return false; + + if (!Character.isJavaIdentifierStart(name.charAt(0))) + return false; + + boolean hasDot = false; + + for (int i = 1; i < len; i++) { + char c = name.charAt(i); + + if (c == '.') + hasDot = true; + else if (!Character.isJavaIdentifierPart(c)) + return false; + } + + return hasDot; + } + + /** + * @param name Variable name. + * @param dflt Default. + * @return Value. + */ + private static String getEnv(String name, String dflt) { + String res = System.getProperty(name); + + if (F.isEmpty(res)) + res = System.getenv(name); + + return F.isEmpty(res) ? dflt : res; + } + + /** + * @param res Result. + * @param dir Directory. + * @param startsWith Starts with prefix. + * @throws MalformedURLException If failed. + */ + private static void addUrls(Collection<URL> res, File dir, final String startsWith) throws Exception { + File[] files = dir.listFiles(new FilenameFilter() { + @Override public boolean accept(File dir, String name) { + return startsWith == null || name.startsWith(startsWith); + } + }); + + if (files == null) + throw new IOException("Path is not a directory: " + dir); + + for (File file : files) + res.add(file.toURI().toURL()); + } + + /** + * @param urls URLs. + * @return URLs. + */ + private static URL[] addHadoopUrls(URL[] urls) { + Collection<URL> hadoopJars; + + try { + hadoopJars = hadoopUrls(); + } + catch (IgniteCheckedException e) { + throw new RuntimeException(e); + } + + ArrayList<URL> list = new ArrayList<>(hadoopJars.size() + appJars.size() + (urls == null ? 0 : urls.length)); + + list.addAll(appJars); + list.addAll(hadoopJars); + + if (!F.isEmpty(urls)) + list.addAll(F.asList(urls)); + + return list.toArray(new URL[list.size()]); + } + + /** + * @return HADOOP_HOME Variable. + */ + @Nullable public static String hadoopHome() { + return getEnv("HADOOP_PREFIX", getEnv("HADOOP_HOME", null)); + } + + /** + * @return Collection of jar URLs. + * @throws IgniteCheckedException If failed. + */ + public static Collection<URL> hadoopUrls() throws IgniteCheckedException { + Collection<URL> hadoopUrls = hadoopJars; + + if (hadoopUrls != null) + return hadoopUrls; + + synchronized (HadoopClassLoader.class) { + hadoopUrls = hadoopJars; + + if (hadoopUrls != null) + return hadoopUrls; + + hadoopUrls = new ArrayList<>(); + + String hadoopPrefix = hadoopHome(); + + if (F.isEmpty(hadoopPrefix)) + throw new IgniteCheckedException("Failed resolve Hadoop installation location. Either HADOOP_PREFIX or " + + "HADOOP_HOME environment variables must be set."); + + String commonHome = getEnv("HADOOP_COMMON_HOME", hadoopPrefix + "/share/hadoop/common"); + String hdfsHome = getEnv("HADOOP_HDFS_HOME", hadoopPrefix + "/share/hadoop/hdfs"); + String mapredHome = getEnv("HADOOP_MAPRED_HOME", hadoopPrefix + "/share/hadoop/mapreduce"); + + try { + addUrls(hadoopUrls, new File(commonHome + "/lib"), null); + addUrls(hadoopUrls, new File(hdfsHome + "/lib"), null); + addUrls(hadoopUrls, new File(mapredHome + "/lib"), null); + + addUrls(hadoopUrls, new File(hdfsHome), "hadoop-hdfs-"); + + addUrls(hadoopUrls, new File(commonHome), "hadoop-common-"); + addUrls(hadoopUrls, new File(commonHome), "hadoop-auth-"); + addUrls(hadoopUrls, new File(commonHome + "/lib"), "hadoop-auth-"); + + addUrls(hadoopUrls, new File(mapredHome), "hadoop-mapreduce-client-common"); + addUrls(hadoopUrls, new File(mapredHome), "hadoop-mapreduce-client-core"); + } + catch (Exception e) { + throw new IgniteCheckedException(e); + } + + hadoopJars = hadoopUrls; + + return hadoopUrls; + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ace354c6/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopComponent.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopComponent.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopComponent.java new file mode 100644 index 0000000..cea11eb --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopComponent.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.ignite.*; + +/** + * Abstract class for all hadoop components. + */ +public abstract class HadoopComponent { + /** Hadoop context. */ + protected HadoopContext ctx; + + /** Logger. */ + protected IgniteLogger log; + + /** + * @param ctx Hadoop context. + */ + public void start(HadoopContext ctx) throws IgniteCheckedException { + this.ctx = ctx; + + log = ctx.kernalContext().log(getClass()); + } + + /** + * Stops manager. + */ + public void stop(boolean cancel) { + // No-op. + } + + /** + * Callback invoked when all grid components are started. + */ + public void onKernalStart() throws IgniteCheckedException { + // No-op. + } + + /** + * Callback invoked before all grid components are stopped. + */ + public void onKernalStop(boolean cancel) { + // No-op. + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ace354c6/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopContext.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopContext.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopContext.java new file mode 100644 index 0000000..bb707c8 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopContext.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.ignite.cluster.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.hadoop.jobtracker.*; +import org.apache.ignite.internal.processors.hadoop.shuffle.*; +import org.apache.ignite.internal.processors.hadoop.taskexecutor.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.util.*; + +/** + * Hadoop accelerator context. + */ +public class HadoopContext { + /** Kernal context. */ + private GridKernalContext ctx; + + /** Hadoop configuration. */ + private GridHadoopConfiguration cfg; + + /** Job tracker. */ + private HadoopJobTracker jobTracker; + + /** External task executor. */ + private HadoopTaskExecutorAdapter taskExecutor; + + /** */ + private HadoopShuffle shuffle; + + /** Managers list. */ + private List<HadoopComponent> components = new ArrayList<>(); + + /** + * @param ctx Kernal context. + */ + public HadoopContext( + GridKernalContext ctx, + GridHadoopConfiguration cfg, + HadoopJobTracker jobTracker, + HadoopTaskExecutorAdapter taskExecutor, + HadoopShuffle shuffle + ) { + this.ctx = ctx; + this.cfg = cfg; + + this.jobTracker = add(jobTracker); + this.taskExecutor = add(taskExecutor); + this.shuffle = add(shuffle); + } + + /** + * Gets list of managers. + * + * @return List of managers. + */ + public List<HadoopComponent> components() { + return components; + } + + /** + * Gets kernal context. + * + * @return Grid kernal context instance. + */ + public GridKernalContext kernalContext() { + return ctx; + } + + /** + * Gets Hadoop configuration. + * + * @return Hadoop configuration. + */ + public GridHadoopConfiguration configuration() { + return cfg; + } + + /** + * Gets local node ID. Shortcut for {@code kernalContext().localNodeId()}. + * + * @return Local node ID. + */ + public UUID localNodeId() { + return ctx.localNodeId(); + } + + /** + * Gets local node order. + * + * @return Local node order. + */ + public long localNodeOrder() { + assert ctx.discovery() != null; + + return ctx.discovery().localNode().order(); + } + + /** + * @return Hadoop-enabled nodes. + */ + public Collection<ClusterNode> nodes() { + return ctx.discovery().cacheNodes(CU.SYS_CACHE_HADOOP_MR, ctx.discovery().topologyVersion()); + } + + /** + * @return {@code True} if + */ + public boolean jobUpdateLeader() { + long minOrder = Long.MAX_VALUE; + ClusterNode minOrderNode = null; + + for (ClusterNode node : nodes()) { + if (node.order() < minOrder) { + minOrder = node.order(); + minOrderNode = node; + } + } + + assert minOrderNode != null; + + return localNodeId().equals(minOrderNode.id()); + } + + /** + * @param meta Job metadata. + * @return {@code true} If local node is participating in job execution. + */ + public boolean isParticipating(GridHadoopJobMetadata meta) { + UUID locNodeId = localNodeId(); + + if (locNodeId.equals(meta.submitNodeId())) + return true; + + GridHadoopMapReducePlan plan = meta.mapReducePlan(); + + return plan.mapperNodeIds().contains(locNodeId) || plan.reducerNodeIds().contains(locNodeId) || jobUpdateLeader(); + } + + /** + * @return Jon tracker instance. + */ + public HadoopJobTracker jobTracker() { + return jobTracker; + } + + /** + * @return Task executor. + */ + public HadoopTaskExecutorAdapter taskExecutor() { + return taskExecutor; + } + + /** + * @return Shuffle. + */ + public HadoopShuffle shuffle() { + return shuffle; + } + + /** + * @return Map-reduce planner. + */ + public GridHadoopMapReducePlanner planner() { + return cfg.getMapReducePlanner(); + } + + /** + * Adds component. + * + * @param c Component to add. + * @return Added manager. + */ + private <C extends HadoopComponent> C add(C c) { + components.add(c); + + return c; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ace354c6/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCounterGroup.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCounterGroup.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCounterGroup.java index bdf8fc6..8655e14 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCounterGroup.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCounterGroup.java @@ -24,7 +24,7 @@ import java.io.*; import java.util.*; /** - * Hadoop counter group adapter. + * Hadoop +counter group adapter. */ class HadoopCounterGroup implements CounterGroup { /** Counters. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ace354c6/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java new file mode 100644 index 0000000..370b82d --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.ignite.*; +import org.apache.ignite.internal.processors.hadoop.v2.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.lang.reflect.*; +import java.util.*; + +/** + * Hadoop job info based on default Hadoop configuration. + */ +public class HadoopDefaultJobInfo implements GridHadoopJobInfo, Externalizable { + /** */ + private static final long serialVersionUID = 5489900236464999951L; + + /** {@code true} If job has combiner. */ + private boolean hasCombiner; + + /** Number of reducers configured for job. */ + private int numReduces; + + /** Configuration. */ + private Map<String,String> props = new HashMap<>(); + + /** Job name. */ + private String jobName; + + /** User name. */ + private String user; + + /** */ + private static volatile Class<?> jobCls; + + /** + * Default constructor required by {@link Externalizable}. + */ + public HadoopDefaultJobInfo() { + // No-op. + } + + /** + * Constructor. + * + * @param jobName Job name. + * @param user User name. + * @param hasCombiner {@code true} If job has combiner. + * @param numReduces Number of reducers configured for job. + * @param props All other properties of the job. + */ + public HadoopDefaultJobInfo(String jobName, String user, boolean hasCombiner, int numReduces, + Map<String, String> props) { + this.jobName = jobName; + this.user = user; + this.hasCombiner = hasCombiner; + this.numReduces = numReduces; + this.props = props; + } + + /** {@inheritDoc} */ + @Nullable @Override public String property(String name) { + return props.get(name); + } + + /** {@inheritDoc} */ + @Override public GridHadoopJob createJob(GridHadoopJobId jobId, IgniteLogger log) throws IgniteCheckedException { + try { + Class<?> jobCls0 = jobCls; + + if (jobCls0 == null) { // It is enough to have only one class loader with only Hadoop classes. + synchronized (HadoopDefaultJobInfo.class) { + if ((jobCls0 = jobCls) == null) { + HadoopClassLoader ldr = new HadoopClassLoader(null); + + jobCls = jobCls0 = ldr.loadClass(GridHadoopV2Job.class.getName()); + } + } + } + + Constructor<?> constructor = jobCls0.getConstructor(GridHadoopJobId.class, HadoopDefaultJobInfo.class, + IgniteLogger.class); + + return (GridHadoopJob)constructor.newInstance(jobId, this, log); + } + // NB: java.lang.NoClassDefFoundError may be thrown from Class#getConstructor() call. + catch (Throwable t) { + throw new IgniteCheckedException(t); + } + } + + /** {@inheritDoc} */ + @Override public boolean hasCombiner() { + return hasCombiner; + } + + /** {@inheritDoc} */ + @Override public boolean hasReducer() { + return reducers() > 0; + } + + /** {@inheritDoc} */ + @Override public int reducers() { + return numReduces; + } + + /** {@inheritDoc} */ + @Override public String jobName() { + return jobName; + } + + /** {@inheritDoc} */ + @Override public String user() { + return user; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + U.writeString(out, jobName); + U.writeString(out, user); + + out.writeBoolean(hasCombiner); + out.writeInt(numReduces); + + U.writeStringMap(out, props); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + jobName = U.readString(in); + user = U.readString(in); + + hasCombiner = in.readBoolean(); + numReduces = in.readInt(); + + props = U.readStringMap(in); + } + + /** + * @return Properties of the job. + */ + public Map<String, String> properties() { + return props; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ace354c6/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopImpl.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopImpl.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopImpl.java new file mode 100644 index 0000000..80fd995 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopImpl.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.ignite.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.util.*; +import org.jetbrains.annotations.*; + +/** + * Hadoop facade implementation. + */ +public class HadoopImpl implements GridHadoop { + /** Hadoop processor. */ + private final IgniteHadoopProcessor proc; + + /** Busy lock. */ + private final GridSpinBusyLock busyLock = new GridSpinBusyLock(); + + /** + * Constructor. + * + * @param proc Hadoop processor. + */ + HadoopImpl(IgniteHadoopProcessor proc) { + this.proc = proc; + } + + /** {@inheritDoc} */ + @Override public GridHadoopConfiguration configuration() { + return proc.config(); + } + + /** {@inheritDoc} */ + @Override public GridHadoopJobId nextJobId() { + if (busyLock.enterBusy()) { + try { + return proc.nextJobId(); + } + finally { + busyLock.leaveBusy(); + } + } + else + throw new IllegalStateException("Failed to get next job ID (grid is stopping)."); + } + + /** {@inheritDoc} */ + @Override public IgniteInternalFuture<?> submit(GridHadoopJobId jobId, GridHadoopJobInfo jobInfo) { + if (busyLock.enterBusy()) { + try { + return proc.submit(jobId, jobInfo); + } + finally { + busyLock.leaveBusy(); + } + } + else + throw new IllegalStateException("Failed to submit job (grid is stopping)."); + } + + /** {@inheritDoc} */ + @Nullable @Override public GridHadoopJobStatus status(GridHadoopJobId jobId) throws IgniteCheckedException { + if (busyLock.enterBusy()) { + try { + return proc.status(jobId); + } + finally { + busyLock.leaveBusy(); + } + } + else + throw new IllegalStateException("Failed to get job status (grid is stopping)."); + } + + /** {@inheritDoc} */ + @Nullable @Override public GridHadoopCounters counters(GridHadoopJobId jobId) throws IgniteCheckedException { + if (busyLock.enterBusy()) { + try { + return proc.counters(jobId); + } + finally { + busyLock.leaveBusy(); + } + } + else + throw new IllegalStateException("Failed to get job counters (grid is stopping)."); + } + + /** {@inheritDoc} */ + @Nullable @Override public IgniteInternalFuture<?> finishFuture(GridHadoopJobId jobId) throws IgniteCheckedException { + if (busyLock.enterBusy()) { + try { + return proc.finishFuture(jobId); + } + finally { + busyLock.leaveBusy(); + } + } + else + throw new IllegalStateException("Failed to get job finish future (grid is stopping)."); + } + + /** {@inheritDoc} */ + @Override public boolean kill(GridHadoopJobId jobId) throws IgniteCheckedException { + if (busyLock.enterBusy()) { + try { + return proc.kill(jobId); + } + finally { + busyLock.leaveBusy(); + } + } + else + throw new IllegalStateException("Failed to kill job (grid is stopping)."); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ace354c6/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopSetup.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopSetup.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopSetup.java new file mode 100644 index 0000000..35df5da --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopSetup.java @@ -0,0 +1,505 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.io.*; +import java.net.*; +import java.nio.file.*; +import java.text.*; +import java.util.*; + +import static org.apache.ignite.internal.IgniteVersionUtils.*; + +/** + * Setup tool to configure Hadoop client. + */ +public class HadoopSetup { + /** */ + public static final String WINUTILS_EXE = "winutils.exe"; + + /** */ + private static final FilenameFilter IGNITE_JARS = new FilenameFilter() { + @Override public boolean accept(File dir, String name) { + return name.startsWith("ignite-") && name.endsWith(".jar"); + } + }; + + /** + * The main method. + * @param ignore Params. + */ + public static void main(String[] ignore) { + X.println( + " __________ ________________ ", + " / _/ ___/ |/ / _/_ __/ __/ ", + " _/ // (_ / // / / / / _/ ", + "/___/\\___/_/|_/___/ /_/ /___/ ", + " for Apache Hadoop ", + " "); + + println("Version " + ACK_VER_STR); + + configureHadoop(); + } + + /** + * This operation prepares the clean unpacked Hadoop distributive to work as client with Ignite-Hadoop. + * It performs these operations: + * <ul> + * <li>Check for setting of HADOOP_HOME environment variable.</li> + * <li>Try to resolve HADOOP_COMMON_HOME or evaluate it relative to HADOOP_HOME.</li> + * <li>In Windows check if winutils.exe exists and try to fix issue with some restrictions.</li> + * <li>In Windows check new line character issues in CMD scripts.</li> + * <li>Scan Hadoop lib directory to detect Ignite JARs. If these don't exist tries to create ones.</li> + * </ul> + */ + private static void configureHadoop() { + String igniteHome = U.getIgniteHome(); + + println("IGNITE_HOME is set to '" + igniteHome + "'."); + + checkIgniteHome(igniteHome); + + String homeVar = "HADOOP_HOME"; + String hadoopHome = System.getenv(homeVar); + + if (F.isEmpty(hadoopHome)) { + homeVar = "HADOOP_PREFIX"; + hadoopHome = System.getenv(homeVar); + } + + if (F.isEmpty(hadoopHome)) + exit("Neither HADOOP_HOME nor HADOOP_PREFIX environment variable is set. Please set one of them to a " + + "valid Hadoop installation directory and run setup tool again.", null); + + hadoopHome = hadoopHome.replaceAll("\"", ""); + + println(homeVar + " is set to '" + hadoopHome + "'."); + + String hiveHome = System.getenv("HIVE_HOME"); + + if (!F.isEmpty(hiveHome)) { + hiveHome = hiveHome.replaceAll("\"", ""); + + println("HIVE_HOME is set to '" + hiveHome + "'."); + } + + File hadoopDir = new File(hadoopHome); + + if (!hadoopDir.exists()) + exit("Hadoop installation folder does not exist.", null); + + if (!hadoopDir.isDirectory()) + exit("HADOOP_HOME must point to a directory.", null); + + if (!hadoopDir.canRead()) + exit("Hadoop installation folder can not be read. Please check permissions.", null); + + File hadoopCommonDir; + + String hadoopCommonHome = System.getenv("HADOOP_COMMON_HOME"); + + if (F.isEmpty(hadoopCommonHome)) { + hadoopCommonDir = new File(hadoopDir, "share/hadoop/common"); + + println("HADOOP_COMMON_HOME is not set, will use '" + hadoopCommonDir.getPath() + "'."); + } + else { + println("HADOOP_COMMON_HOME is set to '" + hadoopCommonHome + "'."); + + hadoopCommonDir = new File(hadoopCommonHome); + } + + if (!hadoopCommonDir.canRead()) + exit("Failed to read Hadoop common dir in '" + hadoopCommonHome + "'.", null); + + File hadoopCommonLibDir = new File(hadoopCommonDir, "lib"); + + if (!hadoopCommonLibDir.canRead()) + exit("Failed to read Hadoop 'lib' folder in '" + hadoopCommonLibDir.getPath() + "'.", null); + + if (U.isWindows()) { + checkJavaPathSpaces(); + + File hadoopBinDir = new File(hadoopDir, "bin"); + + if (!hadoopBinDir.canRead()) + exit("Failed to read subdirectory 'bin' in HADOOP_HOME.", null); + + File winutilsFile = new File(hadoopBinDir, WINUTILS_EXE); + + if (!winutilsFile.exists()) { + if (ask("File '" + WINUTILS_EXE + "' does not exist. " + + "It may be replaced by a stub. Create it?")) { + println("Creating file stub '" + winutilsFile.getAbsolutePath() + "'."); + + boolean ok = false; + + try { + ok = winutilsFile.createNewFile(); + } + catch (IOException ignore) { + // No-op. + } + + if (!ok) + exit("Failed to create '" + WINUTILS_EXE + "' file. Please check permissions.", null); + } + else + println("Ok. But Hadoop client probably will not work on Windows this way..."); + } + + processCmdFiles(hadoopDir, "bin", "sbin", "libexec"); + } + + File igniteLibs = new File(new File(igniteHome), "libs"); + + if (!igniteLibs.exists()) + exit("Ignite 'libs' folder is not found.", null); + + Collection<File> jarFiles = new ArrayList<>(); + + addJarsInFolder(jarFiles, igniteLibs); + addJarsInFolder(jarFiles, new File(igniteLibs, "ignite-hadoop")); + + boolean jarsLinksCorrect = true; + + for (File file : jarFiles) { + File link = new File(hadoopCommonLibDir, file.getName()); + + jarsLinksCorrect &= isJarLinkCorrect(link, file); + + if (!jarsLinksCorrect) + break; + } + + if (!jarsLinksCorrect) { + if (ask("Ignite JAR files are not found in Hadoop 'lib' directory. " + + "Create appropriate symbolic links?")) { + File[] oldIgniteJarFiles = hadoopCommonLibDir.listFiles(IGNITE_JARS); + + if (oldIgniteJarFiles.length > 0 && ask("The Hadoop 'lib' directory contains JARs from other Ignite " + + "installation. They must be deleted to continue. Continue?")) { + for (File file : oldIgniteJarFiles) { + println("Deleting file '" + file.getAbsolutePath() + "'."); + + if (!file.delete()) + exit("Failed to delete file '" + file.getPath() + "'.", null); + } + } + + for (File file : jarFiles) { + File targetFile = new File(hadoopCommonLibDir, file.getName()); + + try { + println("Creating symbolic link '" + targetFile.getAbsolutePath() + "'."); + + Files.createSymbolicLink(targetFile.toPath(), file.toPath()); + } + catch (IOException e) { + if (U.isWindows()) { + warn("Ability to create symbolic links is required!"); + warn("On Windows platform you have to grant permission 'Create symbolic links'"); + warn("to your user or run the Accelerator as Administrator."); + } + + exit("Creating symbolic link failed! Check permissions.", e); + } + } + } + else + println("Ok. But Hadoop client will not be able to talk to Ignite cluster without those JARs in classpath..."); + } + + File hadoopEtc = new File(hadoopDir, "etc" + File.separator + "hadoop"); + + File igniteDocs = new File(igniteHome, "docs"); + + if (!igniteDocs.canRead()) + exit("Failed to read Ignite 'docs' folder at '" + igniteDocs.getAbsolutePath() + "'.", null); + + if (hadoopEtc.canWrite()) { // TODO Bigtop + if (ask("Replace 'core-site.xml' and 'mapred-site.xml' files with preconfigured templates " + + "(existing files will be backed up)?")) { + replaceWithBackup(new File(igniteDocs, "core-site.ignite.xml"), new File(hadoopEtc, "core-site.xml")); + + replaceWithBackup(new File(igniteDocs, "mapred-site.ignite.xml"), new File(hadoopEtc, "mapred-site.xml")); + } + else + println("Ok. You can configure them later, the templates are available at Ignite's 'docs' directory..."); + } + + if (!F.isEmpty(hiveHome)) { + File hiveConfDir = new File(hiveHome + File.separator + "conf"); + + if (!hiveConfDir.canWrite()) + warn("Can not write to '" + hiveConfDir.getAbsolutePath() + "'. To run Hive queries you have to " + + "configure 'hive-site.xml' manually. The template is available at Ignite's 'docs' directory."); + else if (ask("Replace 'hive-site.xml' with preconfigured template (existing file will be backed up)?")) + replaceWithBackup(new File(igniteDocs, "hive-site.ignite.xml"), new File(hiveConfDir, "hive-site.xml")); + else + println("Ok. You can configure it later, the template is available at Ignite's 'docs' directory..."); + } + + println("Apache Hadoop setup is complete."); + } + + /** + * @param jarFiles Jars. + * @param folder Folder. + */ + private static void addJarsInFolder(Collection<File> jarFiles, File folder) { + if (!folder.exists()) + exit("Folder '" + folder.getAbsolutePath() + "' is not found.", null); + + jarFiles.addAll(Arrays.asList(folder.listFiles(IGNITE_JARS))); + } + + /** + * Checks that JAVA_HOME does not contain space characters. + */ + private static void checkJavaPathSpaces() { + String javaHome = System.getProperty("java.home"); + + if (javaHome.contains(" ")) { + warn("Java installation path contains space characters!"); + warn("Hadoop client will not be able to start using '" + javaHome + "'."); + warn("Please install JRE to path which does not contain spaces and point JAVA_HOME to that installation."); + } + } + + /** + * Checks Ignite home. + * + * @param igniteHome Ignite home. + */ + private static void checkIgniteHome(String igniteHome) { + URL jarUrl = U.class.getProtectionDomain().getCodeSource().getLocation(); + + try { + Path jar = Paths.get(jarUrl.toURI()); + Path igHome = Paths.get(igniteHome); + + if (!jar.startsWith(igHome)) + exit("Ignite JAR files are not under IGNITE_HOME.", null); + } + catch (Exception e) { + exit(e.getMessage(), e); + } + } + + /** + * Replaces target file with source file. + * + * @param from From. + * @param to To. + */ + private static void replaceWithBackup(File from, File to) { + if (!from.canRead()) + exit("Failed to read source file '" + from.getAbsolutePath() + "'.", null); + + println("Replacing file '" + to.getAbsolutePath() + "'."); + + try { + U.copy(from, renameToBak(to), true); + } + catch (IOException e) { + exit("Failed to replace file '" + to.getAbsolutePath() + "'.", e); + } + } + + /** + * Renames file for backup. + * + * @param file File. + * @return File. + */ + private static File renameToBak(File file) { + DateFormat fmt = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss"); + + if (file.exists() && !file.renameTo(new File(file.getAbsolutePath() + "." + fmt.format(new Date()) + ".bak"))) + exit("Failed to rename file '" + file.getPath() + "'.", null); + + return file; + } + + /** + * Checks if link is correct. + * + * @param link Symbolic link. + * @param correctTarget Correct link target. + * @return {@code true} If link target is correct. + */ + private static boolean isJarLinkCorrect(File link, File correctTarget) { + if (!Files.isSymbolicLink(link.toPath())) + return false; // It is a real file or it does not exist. + + Path target = null; + + try { + target = Files.readSymbolicLink(link.toPath()); + } + catch (IOException e) { + exit("Failed to read symbolic link: " + link.getAbsolutePath(), e); + } + + return Files.exists(target) && target.toFile().equals(correctTarget); + } + + /** + * Writes the question end read the boolean answer from the console. + * + * @param question Question to write. + * @return {@code true} if user inputs 'Y' or 'y', {@code false} otherwise. + */ + private static boolean ask(String question) { + X.println(); + X.print(" < " + question + " (Y/N): "); + + String answer = null; + + if (!F.isEmpty(System.getenv("IGNITE_HADOOP_SETUP_YES"))) + answer = "Y"; + else { + BufferedReader br = new BufferedReader(new InputStreamReader(System.in)); + + try { + answer = br.readLine(); + } + catch (IOException e) { + exit("Failed to read answer: " + e.getMessage(), e); + } + } + + if (answer != null && "Y".equals(answer.toUpperCase().trim())) { + X.println(" > Yes."); + + return true; + } + else { + X.println(" > No."); + + return false; + } + } + + /** + * Exit with message. + * + * @param msg Exit message. + */ + private static void exit(String msg, Exception e) { + X.println(" "); + X.println(" # " + msg); + X.println(" # Setup failed, exiting... "); + + if (e != null && !F.isEmpty(System.getenv("IGNITE_HADOOP_SETUP_DEBUG"))) + e.printStackTrace(); + + System.exit(1); + } + + /** + * Prints message. + * + * @param msg Message. + */ + private static void println(String msg) { + X.println(" > " + msg); + } + + /** + * Prints warning. + * + * @param msg Message. + */ + private static void warn(String msg) { + X.println(" ! " + msg); + } + + /** + * Checks that CMD files have valid MS Windows new line characters. If not, writes question to console and reads the + * answer. If it's 'Y' then backups original files and corrects invalid new line characters. + * + * @param rootDir Root directory to process. + * @param dirs Directories inside of the root to process. + */ + private static void processCmdFiles(File rootDir, String... dirs) { + boolean answer = false; + + for (String dir : dirs) { + File subDir = new File(rootDir, dir); + + File[] cmdFiles = subDir.listFiles(new FilenameFilter() { + @Override public boolean accept(File dir, String name) { + return name.toLowerCase().endsWith(".cmd"); + } + }); + + for (File file : cmdFiles) { + String content = null; + + try (Scanner scanner = new Scanner(file)) { + content = scanner.useDelimiter("\\Z").next(); + } + catch (FileNotFoundException e) { + exit("Failed to read file '" + file + "'.", e); + } + + boolean invalid = false; + + for (int i = 0; i < content.length(); i++) { + if (content.charAt(i) == '\n' && (i == 0 || content.charAt(i - 1) != '\r')) { + invalid = true; + + break; + } + } + + if (invalid) { + answer = answer || ask("One or more *.CMD files has invalid new line character. Replace them?"); + + if (!answer) { + println("Ok. But Windows most probably will fail to execute them..."); + + return; + } + + println("Fixing newline characters in file '" + file.getAbsolutePath() + "'."); + + renameToBak(file); + + try (BufferedWriter writer = new BufferedWriter(new FileWriter(file))) { + for (int i = 0; i < content.length(); i++) { + if (content.charAt(i) == '\n' && (i == 0 || content.charAt(i - 1) != '\r')) + writer.write("\r"); + + writer.write(content.charAt(i)); + } + } + catch (IOException e) { + exit("Failed to write file '" + file.getPath() + "': " + e.getMessage(), e); + } + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ace354c6/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskCancelledException.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskCancelledException.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskCancelledException.java new file mode 100644 index 0000000..bb3d1cc --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskCancelledException.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.ignite.*; + +/** + * Exception that throws when the task is cancelling. + */ +public class HadoopTaskCancelledException extends IgniteException { + /** */ + private static final long serialVersionUID = 0L; + + /** + * @param msg Exception message. + */ + public HadoopTaskCancelledException(String msg) { + super(msg); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ace354c6/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java new file mode 100644 index 0000000..46594ce --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java @@ -0,0 +1,308 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.hadoop.conf.*; +import org.apache.hadoop.fs.*; +import org.apache.hadoop.io.*; +import org.apache.hadoop.mapred.*; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.JobPriority; +import org.apache.hadoop.mapreduce.JobStatus; +import org.apache.hadoop.mapreduce.*; +import org.apache.ignite.*; +import org.apache.ignite.internal.processors.hadoop.v2.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.io.*; +import java.util.*; + +/** + * Hadoop utility methods. + */ +public class HadoopUtils { + /** Property to store timestamp of new job id request. */ + public static final String REQ_NEW_JOBID_TS_PROPERTY = "ignite.job.requestNewIdTs"; + + /** Property to store timestamp of response of new job id request. */ + public static final String RESPONSE_NEW_JOBID_TS_PROPERTY = "ignite.job.responseNewIdTs"; + + /** Property to store timestamp of job submission. */ + public static final String JOB_SUBMISSION_START_TS_PROPERTY = "ignite.job.submissionStartTs"; + + /** Property to set custom writer of job statistics. */ + public static final String JOB_COUNTER_WRITER_PROPERTY = "ignite.counters.writer"; + + /** Staging constant. */ + private static final String STAGING_CONSTANT = ".staging"; + + /** Old mapper class attribute. */ + private static final String OLD_MAP_CLASS_ATTR = "mapred.mapper.class"; + + /** Old reducer class attribute. */ + private static final String OLD_REDUCE_CLASS_ATTR = "mapred.reducer.class"; + + /** + * Wraps native split. + * + * @param id Split ID. + * @param split Split. + * @param hosts Hosts. + * @throws IOException If failed. + */ + public static GridHadoopSplitWrapper wrapSplit(int id, Object split, String[] hosts) throws IOException { + ByteArrayOutputStream arr = new ByteArrayOutputStream(); + ObjectOutput out = new ObjectOutputStream(arr); + + assert split instanceof Writable; + + ((Writable)split).write(out); + + out.flush(); + + return new GridHadoopSplitWrapper(id, split.getClass().getName(), arr.toByteArray(), hosts); + } + + /** + * Unwraps native split. + * + * @param o Wrapper. + * @return Split. + */ + public static Object unwrapSplit(GridHadoopSplitWrapper o) { + try { + Writable w = (Writable)HadoopUtils.class.getClassLoader().loadClass(o.className()).newInstance(); + + w.readFields(new ObjectInputStream(new ByteArrayInputStream(o.bytes()))); + + return w; + } + catch (Exception e) { + throw new IllegalStateException(e); + } + } + + /** + * Convert Ignite job status to Hadoop job status. + * + * @param status Ignite job status. + * @return Hadoop job status. + */ + public static JobStatus status(GridHadoopJobStatus status, Configuration conf) { + JobID jobId = new JobID(status.jobId().globalId().toString(), status.jobId().localId()); + + float setupProgress = 0; + float mapProgress = 0; + float reduceProgress = 0; + float cleanupProgress = 0; + + JobStatus.State state = JobStatus.State.RUNNING; + + switch (status.jobPhase()) { + case PHASE_SETUP: + setupProgress = 0.42f; + + break; + + case PHASE_MAP: + setupProgress = 1; + mapProgress = 1f - status.pendingMapperCnt() / (float)status.totalMapperCnt(); + + break; + + case PHASE_REDUCE: + assert status.totalReducerCnt() > 0; + + setupProgress = 1; + mapProgress = 1; + reduceProgress = 1f - status.pendingReducerCnt() / (float)status.totalReducerCnt(); + + break; + + case PHASE_CANCELLING: + case PHASE_COMPLETE: + if (!status.isFailed()) { + setupProgress = 1; + mapProgress = 1; + reduceProgress = 1; + cleanupProgress = 1; + + state = JobStatus.State.SUCCEEDED; + } + else + state = JobStatus.State.FAILED; + + break; + + default: + assert false; + } + + return new JobStatus(jobId, setupProgress, mapProgress, reduceProgress, cleanupProgress, state, + JobPriority.NORMAL, status.user(), status.jobName(), jobFile(conf, status.user(), jobId).toString(), "N/A"); + } + + /** + * Gets staging area directory. + * + * @param conf Configuration. + * @param usr User. + * @return Staging area directory. + */ + public static Path stagingAreaDir(Configuration conf, String usr) { + return new Path(conf.get(MRJobConfig.MR_AM_STAGING_DIR, MRJobConfig.DEFAULT_MR_AM_STAGING_DIR) + + Path.SEPARATOR + usr + Path.SEPARATOR + STAGING_CONSTANT); + } + + /** + * Gets job file. + * + * @param conf Configuration. + * @param usr User. + * @param jobId Job ID. + * @return Job file. + */ + public static Path jobFile(Configuration conf, String usr, JobID jobId) { + return new Path(stagingAreaDir(conf, usr), jobId.toString() + Path.SEPARATOR + MRJobConfig.JOB_CONF_FILE); + } + + /** + * Checks the attribute in configuration is not set. + * + * @param attr Attribute name. + * @param msg Message for creation of exception. + * @throws IgniteCheckedException If attribute is set. + */ + public static void ensureNotSet(Configuration cfg, String attr, String msg) throws IgniteCheckedException { + if (cfg.get(attr) != null) + throw new IgniteCheckedException(attr + " is incompatible with " + msg + " mode."); + } + + /** + * Creates JobInfo from hadoop configuration. + * + * @param cfg Hadoop configuration. + * @return Job info. + * @throws IgniteCheckedException If failed. + */ + public static HadoopDefaultJobInfo createJobInfo(Configuration cfg) throws IgniteCheckedException { + JobConf jobConf = new JobConf(cfg); + + boolean hasCombiner = jobConf.get("mapred.combiner.class") != null + || jobConf.get(MRJobConfig.COMBINE_CLASS_ATTR) != null; + + int numReduces = jobConf.getNumReduceTasks(); + + jobConf.setBooleanIfUnset("mapred.mapper.new-api", jobConf.get(OLD_MAP_CLASS_ATTR) == null); + + if (jobConf.getUseNewMapper()) { + String mode = "new map API"; + + ensureNotSet(jobConf, "mapred.input.format.class", mode); + ensureNotSet(jobConf, OLD_MAP_CLASS_ATTR, mode); + + if (numReduces != 0) + ensureNotSet(jobConf, "mapred.partitioner.class", mode); + else + ensureNotSet(jobConf, "mapred.output.format.class", mode); + } + else { + String mode = "map compatibility"; + + ensureNotSet(jobConf, MRJobConfig.INPUT_FORMAT_CLASS_ATTR, mode); + ensureNotSet(jobConf, MRJobConfig.MAP_CLASS_ATTR, mode); + + if (numReduces != 0) + ensureNotSet(jobConf, MRJobConfig.PARTITIONER_CLASS_ATTR, mode); + else + ensureNotSet(jobConf, MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR, mode); + } + + if (numReduces != 0) { + jobConf.setBooleanIfUnset("mapred.reducer.new-api", jobConf.get(OLD_REDUCE_CLASS_ATTR) == null); + + if (jobConf.getUseNewReducer()) { + String mode = "new reduce API"; + + ensureNotSet(jobConf, "mapred.output.format.class", mode); + ensureNotSet(jobConf, OLD_REDUCE_CLASS_ATTR, mode); + } + else { + String mode = "reduce compatibility"; + + ensureNotSet(jobConf, MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR, mode); + ensureNotSet(jobConf, MRJobConfig.REDUCE_CLASS_ATTR, mode); + } + } + + Map<String, String> props = new HashMap<>(); + + for (Map.Entry<String, String> entry : jobConf) + props.put(entry.getKey(), entry.getValue()); + + return new HadoopDefaultJobInfo(jobConf.getJobName(), jobConf.getUser(), hasCombiner, numReduces, props); + } + + /** + * Throws new {@link IgniteCheckedException} with original exception is serialized into string. + * This is needed to transfer error outside the current class loader. + * + * @param e Original exception. + * @return IgniteCheckedException New exception. + */ + public static IgniteCheckedException transformException(Throwable e) { + ByteArrayOutputStream os = new ByteArrayOutputStream(); + + e.printStackTrace(new PrintStream(os, true)); + + return new IgniteCheckedException(os.toString()); + } + + /** + * Returns work directory for job execution. + * + * @param locNodeId Local node ID. + * @param jobId Job ID. + * @return Working directory for job. + * @throws IgniteCheckedException If Failed. + */ + public static File jobLocalDir(UUID locNodeId, GridHadoopJobId jobId) throws IgniteCheckedException { + return new File(new File(U.resolveWorkDirectory("hadoop", false), "node-" + locNodeId), "job_" + jobId); + } + + /** + * Returns subdirectory of job working directory for task execution. + * + * @param locNodeId Local node ID. + * @param info Task info. + * @return Working directory for task. + * @throws IgniteCheckedException If Failed. + */ + public static File taskLocalDir(UUID locNodeId, GridHadoopTaskInfo info) throws IgniteCheckedException { + File jobLocDir = jobLocalDir(locNodeId, info.jobId()); + + return new File(jobLocDir, info.type() + "_" + info.taskNumber() + "_" + info.attempt()); + } + + /** + * Constructor. + */ + private HadoopUtils() { + // No-op. + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ace354c6/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/IgniteHadoopProcessor.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/IgniteHadoopProcessor.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/IgniteHadoopProcessor.java index 4ef9e35..63e4854 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/IgniteHadoopProcessor.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/IgniteHadoopProcessor.java @@ -30,7 +30,7 @@ import org.apache.ignite.internal.util.typedef.internal.*; import java.util.*; import java.util.concurrent.atomic.*; -import static org.apache.ignite.internal.processors.hadoop.GridHadoopClassLoader.*; +import static org.apache.ignite.internal.processors.hadoop.HadoopClassLoader.*; /** * Hadoop processor. @@ -41,7 +41,7 @@ public class IgniteHadoopProcessor extends IgniteHadoopProcessorAdapter { /** Hadoop context. */ @GridToStringExclude - private GridHadoopContext hctx; + private HadoopContext hctx; /** Hadoop facade for public API. */ @GridToStringExclude @@ -85,18 +85,18 @@ public class IgniteHadoopProcessor extends IgniteHadoopProcessorAdapter { } if (ok) { - hctx = new GridHadoopContext( + hctx = new HadoopContext( ctx, cfg, - new GridHadoopJobTracker(), - cfg.isExternalExecution() ? new GridHadoopExternalTaskExecutor() : new GridHadoopEmbeddedTaskExecutor(), - new GridHadoopShuffle()); + new HadoopJobTracker(), + cfg.isExternalExecution() ? new HadoopExternalTaskExecutor() : new HadoopEmbeddedTaskExecutor(), + new HadoopShuffle()); - for (GridHadoopComponent c : hctx.components()) + for (HadoopComponent c : hctx.components()) c.start(hctx); - hadoop = new GridHadoopImpl(this); + hadoop = new HadoopImpl(this); } } @@ -112,10 +112,10 @@ public class IgniteHadoopProcessor extends IgniteHadoopProcessorAdapter { if (hctx == null) return; - List<GridHadoopComponent> components = hctx.components(); + List<HadoopComponent> components = hctx.components(); - for (ListIterator<GridHadoopComponent> it = components.listIterator(components.size()); it.hasPrevious();) { - GridHadoopComponent c = it.previous(); + for (ListIterator<HadoopComponent> it = components.listIterator(components.size()); it.hasPrevious();) { + HadoopComponent c = it.previous(); c.stop(cancel); } @@ -128,7 +128,7 @@ public class IgniteHadoopProcessor extends IgniteHadoopProcessorAdapter { if (hctx == null) return; - for (GridHadoopComponent c : hctx.components()) + for (HadoopComponent c : hctx.components()) c.onKernalStart(); } @@ -139,10 +139,10 @@ public class IgniteHadoopProcessor extends IgniteHadoopProcessorAdapter { if (hctx == null) return; - List<GridHadoopComponent> components = hctx.components(); + List<HadoopComponent> components = hctx.components(); - for (ListIterator<GridHadoopComponent> it = components.listIterator(components.size()); it.hasPrevious();) { - GridHadoopComponent c = it.previous(); + for (ListIterator<HadoopComponent> it = components.listIterator(components.size()); it.hasPrevious();) { + HadoopComponent c = it.previous(); c.onKernalStop(cancel); } @@ -153,7 +153,7 @@ public class IgniteHadoopProcessor extends IgniteHadoopProcessorAdapter { * * @return Hadoop context. */ - public GridHadoopContext context() { + public HadoopContext context() { return hctx; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ace354c6/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopFSCounterWriter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopFSCounterWriter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopFSCounterWriter.java index 55dcc4c..d603d76 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopFSCounterWriter.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopFSCounterWriter.java @@ -52,7 +52,7 @@ public class GridHadoopFSCounterWriter implements GridHadoopCounterWriter { Configuration hadoopCfg = new Configuration(); - for (Map.Entry<String, String> e : ((GridHadoopDefaultJobInfo)jobInfo).properties().entrySet()) + for (Map.Entry<String, String> e : ((HadoopDefaultJobInfo)jobInfo).properties().entrySet()) hadoopCfg.set(e.getKey(), e.getValue()); String user = jobInfo.user(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ace354c6/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopPerformanceCounter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopPerformanceCounter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopPerformanceCounter.java index d5ceebf..263a075 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopPerformanceCounter.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopPerformanceCounter.java @@ -25,7 +25,7 @@ import org.jetbrains.annotations.*; import java.io.*; import java.util.*; -import static org.apache.ignite.internal.processors.hadoop.GridHadoopUtils.*; +import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*; /** * Counter for the job statistics accumulation.