http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopExternalSplit.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopExternalSplit.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopExternalSplit.java new file mode 100644 index 0000000..496a710 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopExternalSplit.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.v2; + +import org.apache.ignite.internal.processors.hadoop.*; + +import java.io.*; + +/** + * Split serialized in external file. + */ +public class HadoopExternalSplit extends HadoopInputSplit { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private long off; + + /** + * For {@link Externalizable}. + */ + public HadoopExternalSplit() { + // No-op. + } + + /** + * @param hosts Hosts. + * @param off Offset of this split in external file. + */ + public HadoopExternalSplit(String[] hosts, long off) { + assert off >= 0 : off; + assert hosts != null; + + this.hosts = hosts; + this.off = off; + } + + /** + * @return Offset of this input split in external file. + */ + public long offset() { + return off; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeLong(off); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + off = in.readLong(); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + HadoopExternalSplit that = (HadoopExternalSplit) o; + + return off == that.off; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return (int)(off ^ (off >>> 32)); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopNativeCodeLoader.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopNativeCodeLoader.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopNativeCodeLoader.java new file mode 100644 index 0000000..081182d --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopNativeCodeLoader.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.v2; + +import org.apache.hadoop.classification.*; +import org.apache.hadoop.conf.*; + +/** + * A fake helper to load the native hadoop code i.e. libhadoop.so. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class HadoopNativeCodeLoader { + /** + * Check if native-hadoop code is loaded for this platform. + * + * @return <code>true</code> if native-hadoop is loaded, + * else <code>false</code> + */ + public static boolean isNativeCodeLoaded() { + return false; + } + + /** + * Returns true only if this build was compiled with support for snappy. + */ + public static boolean buildSupportsSnappy() { + return false; + } + + /** + * @return Library name. + */ + public static String getLibraryName() { + throw new IllegalStateException(); + } + + /** + * Return if native hadoop libraries, if present, can be used for this job. + * @param conf configuration + * + * @return <code>true</code> if native hadoop libraries, if present, can be + * used for this job; <code>false</code> otherwise. + */ + public boolean getLoadNativeLibraries(Configuration conf) { + return false; + } + + /** + * Set if native hadoop libraries, if present, can be used for this job. + * + * @param conf configuration + * @param loadNativeLibraries can native hadoop libraries be loaded + */ + public void setLoadNativeLibraries(Configuration conf, boolean loadNativeLibraries) { + // No-op. + } +} + http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopSerializationWrapper.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopSerializationWrapper.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopSerializationWrapper.java new file mode 100644 index 0000000..bb9cb68 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopSerializationWrapper.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.v2; + +import org.apache.hadoop.io.serializer.*; +import org.apache.ignite.*; +import org.apache.ignite.internal.processors.hadoop.*; +import org.jetbrains.annotations.*; + +import java.io.*; + +/** + * The wrapper around external serializer. + */ +public class HadoopSerializationWrapper<T> implements HadoopSerialization { + /** External serializer - writer. */ + private final Serializer<T> serializer; + + /** External serializer - reader. */ + private final Deserializer<T> deserializer; + + /** Data output for current write operation. */ + private OutputStream currOut; + + /** Data input for current read operation. */ + private InputStream currIn; + + /** Wrapper around current output to provide OutputStream interface. */ + private final OutputStream outStream = new OutputStream() { + /** {@inheritDoc} */ + @Override public void write(int b) throws IOException { + currOut.write(b); + } + + /** {@inheritDoc} */ + @Override public void write(byte[] b, int off, int len) throws IOException { + currOut.write(b, off, len); + } + }; + + /** Wrapper around current input to provide InputStream interface. */ + private final InputStream inStream = new InputStream() { + /** {@inheritDoc} */ + @Override public int read() throws IOException { + return currIn.read(); + } + + /** {@inheritDoc} */ + @Override public int read(byte[] b, int off, int len) throws IOException { + return currIn.read(b, off, len); + } + }; + + /** + * @param serialization External serializer to wrap. + * @param cls The class to serialize. + */ + public HadoopSerializationWrapper(Serialization<T> serialization, Class<T> cls) throws IgniteCheckedException { + assert cls != null; + + serializer = serialization.getSerializer(cls); + deserializer = serialization.getDeserializer(cls); + + try { + serializer.open(outStream); + deserializer.open(inStream); + } + catch (IOException e) { + throw new IgniteCheckedException(e); + } + } + + /** {@inheritDoc} */ + @Override public void write(DataOutput out, Object obj) throws IgniteCheckedException { + assert out != null; + assert obj != null; + + try { + currOut = (OutputStream)out; + + serializer.serialize((T)obj); + + currOut = null; + } + catch (IOException e) { + throw new IgniteCheckedException(e); + } + } + + /** {@inheritDoc} */ + @Override public Object read(DataInput in, @Nullable Object obj) throws IgniteCheckedException { + assert in != null; + + try { + currIn = (InputStream)in; + + T res = deserializer.deserialize((T) obj); + + currIn = null; + + return res; + } + catch (IOException e) { + throw new IgniteCheckedException(e); + } + } + + /** {@inheritDoc} */ + @Override public void close() throws IgniteCheckedException { + try { + serializer.close(); + deserializer.close(); + } + catch (IOException e) { + throw new IgniteCheckedException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopShutdownHookManager.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopShutdownHookManager.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopShutdownHookManager.java new file mode 100644 index 0000000..454f90a --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopShutdownHookManager.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.v2; + +import java.util.*; +import java.util.concurrent.atomic.*; + +/** + * Fake manager for shutdown hooks. + */ +public class HadoopShutdownHookManager { + /** */ + private static final HadoopShutdownHookManager MGR = new HadoopShutdownHookManager(); + + /** + * Return <code>ShutdownHookManager</code> singleton. + * + * @return <code>ShutdownHookManager</code> singleton. + */ + public static HadoopShutdownHookManager get() { + return MGR; + } + + /** */ + private Set<Runnable> hooks = Collections.synchronizedSet(new HashSet<Runnable>()); + + /** */ + private AtomicBoolean shutdownInProgress = new AtomicBoolean(false); + + /** + * Singleton. + */ + private HadoopShutdownHookManager() { + // No-op. + } + + /** + * Adds a shutdownHook with a priority, the higher the priority + * the earlier will run. ShutdownHooks with same priority run + * in a non-deterministic order. + * + * @param shutdownHook shutdownHook <code>Runnable</code> + * @param priority priority of the shutdownHook. + */ + public void addShutdownHook(Runnable shutdownHook, int priority) { + if (shutdownHook == null) + throw new IllegalArgumentException("shutdownHook cannot be NULL"); + + hooks.add(shutdownHook); + } + + /** + * Removes a shutdownHook. + * + * @param shutdownHook shutdownHook to remove. + * @return TRUE if the shutdownHook was registered and removed, + * FALSE otherwise. + */ + public boolean removeShutdownHook(Runnable shutdownHook) { + return hooks.remove(shutdownHook); + } + + /** + * Indicates if a shutdownHook is registered or not. + * + * @param shutdownHook shutdownHook to check if registered. + * @return TRUE/FALSE depending if the shutdownHook is is registered. + */ + public boolean hasShutdownHook(Runnable shutdownHook) { + return hooks.contains(shutdownHook); + } + + /** + * Indicates if shutdown is in progress or not. + * + * @return TRUE if the shutdown is in progress, otherwise FALSE. + */ + public boolean isShutdownInProgress() { + return shutdownInProgress.get(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopSplitWrapper.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopSplitWrapper.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopSplitWrapper.java new file mode 100644 index 0000000..bc7ded3 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopSplitWrapper.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.v2; + +import org.apache.ignite.internal.processors.hadoop.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.io.*; + +/** + * The wrapper for native hadoop input splits. + * + * Warning!! This class must not depend on any Hadoop classes directly or indirectly. + */ +public class HadoopSplitWrapper extends HadoopInputSplit { + /** */ + private static final long serialVersionUID = 0L; + + /** Native hadoop input split. */ + private byte[] bytes; + + /** */ + private String clsName; + + /** Internal ID */ + private int id; + + /** + * Creates new split wrapper. + */ + public HadoopSplitWrapper() { + // No-op. + } + + /** + * Creates new split wrapper. + * + * @param id Split ID. + * @param clsName Class name. + * @param bytes Serialized class. + * @param hosts Hosts where split is located. + */ + public HadoopSplitWrapper(int id, String clsName, byte[] bytes, String[] hosts) { + assert hosts != null; + assert clsName != null; + assert bytes != null; + + this.hosts = hosts; + this.id = id; + + this.clsName = clsName; + this.bytes = bytes; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeInt(id); + + out.writeUTF(clsName); + U.writeByteArray(out, bytes); + } + + /** + * @return Class name. + */ + public String className() { + return clsName; + } + + /** + * @return Class bytes. + */ + public byte[] bytes() { + return bytes; + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + id = in.readInt(); + + clsName = in.readUTF(); + bytes = U.readByteArray(in); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + HadoopSplitWrapper that = (HadoopSplitWrapper)o; + + return id == that.id; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return id; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2CleanupTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2CleanupTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2CleanupTask.java new file mode 100644 index 0000000..534033b --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2CleanupTask.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.v2; + +import org.apache.hadoop.mapred.*; +import org.apache.hadoop.mapreduce.JobStatus; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.ignite.*; +import org.apache.ignite.internal.processors.hadoop.*; +import org.apache.ignite.internal.*; + +import java.io.*; + +/** + * Hadoop cleanup task (commits or aborts job). + */ +public class HadoopV2CleanupTask extends HadoopV2Task { + /** Abort flag. */ + private final boolean abort; + + /** + * @param taskInfo Task info. + * @param abort Abort flag. + */ + public HadoopV2CleanupTask(HadoopTaskInfo taskInfo, boolean abort) { + super(taskInfo); + + this.abort = abort; + } + + /** {@inheritDoc} */ + @SuppressWarnings("ConstantConditions") + @Override public void run0(HadoopV2TaskContext taskCtx) throws IgniteCheckedException { + JobContextImpl jobCtx = taskCtx.jobContext(); + + try { + OutputFormat outputFormat = getOutputFormat(jobCtx); + + OutputCommitter committer = outputFormat.getOutputCommitter(hadoopContext()); + + if (committer != null) { + if (abort) + committer.abortJob(jobCtx, JobStatus.State.FAILED); + else + committer.commitJob(jobCtx); + } + } + catch (ClassNotFoundException | IOException e) { + throw new IgniteCheckedException(e); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + + throw new IgniteInterruptedCheckedException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Context.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Context.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Context.java new file mode 100644 index 0000000..3f8e2b6 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Context.java @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.v2; + +import org.apache.hadoop.fs.*; +import org.apache.hadoop.mapreduce.*; +import org.apache.hadoop.mapreduce.lib.input.*; +import org.apache.hadoop.mapreduce.task.*; +import org.apache.ignite.*; +import org.apache.ignite.internal.processors.hadoop.*; +import org.apache.ignite.internal.processors.hadoop.counter.*; + +import java.io.*; +import java.util.*; + +/** + * Hadoop context implementation for v2 API. It provides IO operations for hadoop tasks. + */ +public class HadoopV2Context extends JobContextImpl implements MapContext, ReduceContext { + /** Input reader to overriding of HadoopTaskContext input. */ + private RecordReader reader; + + /** Output writer to overriding of HadoopTaskContext output. */ + private RecordWriter writer; + + /** Output is provided by executor environment. */ + private final HadoopTaskOutput output; + + /** Input is provided by executor environment. */ + private final HadoopTaskInput input; + + /** Unique identifier for a task attempt. */ + private final TaskAttemptID taskAttemptID; + + /** Indicates that this task is to be cancelled. */ + private volatile boolean cancelled; + + /** Input split. */ + private InputSplit inputSplit; + + /** */ + private final HadoopTaskContext ctx; + + /** */ + private String status; + + /** + * @param ctx Context for IO operations. + */ + public HadoopV2Context(HadoopV2TaskContext ctx) { + super(ctx.jobConf(), ctx.jobContext().getJobID()); + + taskAttemptID = ctx.attemptId(); + + conf.set("mapreduce.job.id", taskAttemptID.getJobID().toString()); + conf.set("mapreduce.task.id", taskAttemptID.getTaskID().toString()); + + output = ctx.output(); + input = ctx.input(); + + this.ctx = ctx; + } + + /** {@inheritDoc} */ + @Override public InputSplit getInputSplit() { + if (inputSplit == null) { + HadoopInputSplit split = ctx.taskInfo().inputSplit(); + + if (split == null) + return null; + + if (split instanceof HadoopFileBlock) { + HadoopFileBlock fileBlock = (HadoopFileBlock)split; + + inputSplit = new FileSplit(new Path(fileBlock.file()), fileBlock.start(), fileBlock.length(), null); + } + else if (split instanceof HadoopExternalSplit) + throw new UnsupportedOperationException(); // TODO + else if (split instanceof HadoopSplitWrapper) + inputSplit = (InputSplit) HadoopUtils.unwrapSplit((HadoopSplitWrapper) split); + else + throw new IllegalStateException(); + } + + return inputSplit; + } + + /** {@inheritDoc} */ + @Override public boolean nextKeyValue() throws IOException, InterruptedException { + if (cancelled) + throw new HadoopTaskCancelledException("Task cancelled."); + + return reader.nextKeyValue(); + } + + /** {@inheritDoc} */ + @Override public Object getCurrentKey() throws IOException, InterruptedException { + if (reader != null) + return reader.getCurrentKey(); + + return input.key(); + } + + /** {@inheritDoc} */ + @Override public Object getCurrentValue() throws IOException, InterruptedException { + return reader.getCurrentValue(); + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public void write(Object key, Object val) throws IOException, InterruptedException { + if (cancelled) + throw new HadoopTaskCancelledException("Task cancelled."); + + if (writer != null) + writer.write(key, val); + else { + try { + output.write(key, val); + } + catch (IgniteCheckedException e) { + throw new IOException(e); + } + } + } + + /** {@inheritDoc} */ + @Override public OutputCommitter getOutputCommitter() { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public TaskAttemptID getTaskAttemptID() { + return taskAttemptID; + } + + /** {@inheritDoc} */ + @Override public void setStatus(String msg) { + status = msg; + } + + /** {@inheritDoc} */ + @Override public String getStatus() { + return status; + } + + /** {@inheritDoc} */ + @Override public float getProgress() { + return 0.5f; // TODO + } + + /** {@inheritDoc} */ + @Override public Counter getCounter(Enum<?> cntrName) { + return getCounter(cntrName.getDeclaringClass().getName(), cntrName.name()); + } + + /** {@inheritDoc} */ + @Override public Counter getCounter(String grpName, String cntrName) { + return new HadoopV2Counter(ctx.counter(grpName, cntrName, HadoopLongCounter.class)); + } + + /** {@inheritDoc} */ + @Override public void progress() { + // No-op. + } + + /** + * Overrides default input data reader. + * + * @param reader New reader. + */ + public void reader(RecordReader reader) { + this.reader = reader; + } + + /** {@inheritDoc} */ + @Override public boolean nextKey() throws IOException, InterruptedException { + if (cancelled) + throw new HadoopTaskCancelledException("Task cancelled."); + + return input.next(); + } + + /** {@inheritDoc} */ + @Override public Iterable getValues() throws IOException, InterruptedException { + return new Iterable() { + @Override public Iterator iterator() { + return input.values(); + } + }; + } + + /** + * @return Overridden output data writer. + */ + public RecordWriter writer() { + return writer; + } + + /** + * Overrides default output data writer. + * + * @param writer New writer. + */ + public void writer(RecordWriter writer) { + this.writer = writer; + } + + /** + * Cancels the task by stop the IO. + */ + public void cancel() { + cancelled = true; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Counter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Counter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Counter.java new file mode 100644 index 0000000..96ede0d --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Counter.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.v2; + +import org.apache.hadoop.mapreduce.*; +import org.apache.ignite.internal.processors.hadoop.counter.*; + +import java.io.*; + +/** + * Adapter from own counter implementation into Hadoop API Counter od version 2.0. + */ +public class HadoopV2Counter implements Counter { + /** Delegate. */ + private final HadoopLongCounter cntr; + + /** + * Creates new instance with given delegate. + * + * @param cntr Internal counter. + */ + public HadoopV2Counter(HadoopLongCounter cntr) { + assert cntr != null : "counter must be non-null"; + + this.cntr = cntr; + } + + /** {@inheritDoc} */ + @Override public void setDisplayName(String displayName) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public String getName() { + return cntr.name(); + } + + /** {@inheritDoc} */ + @Override public String getDisplayName() { + return getName(); + } + + /** {@inheritDoc} */ + @Override public long getValue() { + return cntr.value(); + } + + /** {@inheritDoc} */ + @Override public void setValue(long val) { + cntr.value(val); + } + + /** {@inheritDoc} */ + @Override public void increment(long incr) { + cntr.increment(incr); + } + + /** {@inheritDoc} */ + @Override public Counter getUnderlyingCounter() { + return this; + } + + /** {@inheritDoc} */ + @Override public void write(DataOutput out) throws IOException { + throw new UnsupportedOperationException("not implemented"); + } + + /** {@inheritDoc} */ + @Override public void readFields(DataInput in) throws IOException { + throw new UnsupportedOperationException("not implemented"); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java new file mode 100644 index 0000000..f2f0cab --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.v2; + +import org.apache.hadoop.fs.*; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.io.*; +import org.apache.hadoop.mapred.*; +import org.apache.hadoop.mapred.JobID; +import org.apache.hadoop.mapreduce.*; +import org.apache.hadoop.mapreduce.split.*; +import org.apache.ignite.*; +import org.apache.ignite.internal.processors.hadoop.*; +import org.apache.ignite.internal.processors.hadoop.fs.*; +import org.apache.ignite.internal.processors.hadoop.v1.*; +import org.apache.ignite.internal.util.future.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.jdk8.backport.*; + +import java.io.*; +import java.lang.reflect.*; +import java.util.*; +import java.util.Queue; +import java.util.concurrent.*; + +import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*; + +/** + * Hadoop job implementation for v2 API. + */ +public class HadoopV2Job implements HadoopJob { + /** */ + private final JobConf jobConf; + + /** */ + private final JobContextImpl jobCtx; + + /** Hadoop job ID. */ + private final HadoopJobId jobId; + + /** Job info. */ + protected HadoopJobInfo jobInfo; + + /** */ + private final JobID hadoopJobID; + + /** */ + private final HadoopV2JobResourceManager rsrcMgr; + + /** */ + private final ConcurrentMap<T2<HadoopTaskType, Integer>, GridFutureAdapter<HadoopTaskContext>> ctxs = + new ConcurrentHashMap8<>(); + + /** Pooling task context class and thus class loading environment. */ + private final Queue<Class<?>> taskCtxClsPool = new ConcurrentLinkedQueue<>(); + + /** Local node ID */ + private UUID locNodeId; + + /** Serialized JobConf. */ + private volatile byte[] jobConfData; + + /** + * @param jobId Job ID. + * @param jobInfo Job info. + * @param log Logger. + */ + public HadoopV2Job(HadoopJobId jobId, final HadoopDefaultJobInfo jobInfo, IgniteLogger log) { + assert jobId != null; + assert jobInfo != null; + + this.jobId = jobId; + this.jobInfo = jobInfo; + + hadoopJobID = new JobID(jobId.globalId().toString(), jobId.localId()); + + HadoopClassLoader clsLdr = (HadoopClassLoader)getClass().getClassLoader(); + + // Before create JobConf instance we should set new context class loader. + Thread.currentThread().setContextClassLoader(clsLdr); + + jobConf = new JobConf(); + + HadoopFileSystemsUtils.setupFileSystems(jobConf); + + Thread.currentThread().setContextClassLoader(null); + + for (Map.Entry<String,String> e : jobInfo.properties().entrySet()) + jobConf.set(e.getKey(), e.getValue()); + + jobCtx = new JobContextImpl(jobConf, hadoopJobID); + + rsrcMgr = new HadoopV2JobResourceManager(jobId, jobCtx, log); + } + + /** {@inheritDoc} */ + @Override public HadoopJobId id() { + return jobId; + } + + /** {@inheritDoc} */ + @Override public HadoopJobInfo info() { + return jobInfo; + } + + /** {@inheritDoc} */ + @Override public Collection<HadoopInputSplit> input() throws IgniteCheckedException { + Thread.currentThread().setContextClassLoader(jobConf.getClassLoader()); + + try { + String jobDirPath = jobConf.get(MRJobConfig.MAPREDUCE_JOB_DIR); + + if (jobDirPath == null) { // Probably job was submitted not by hadoop client. + // Assume that we have needed classes and try to generate input splits ourself. + if (jobConf.getUseNewMapper()) + return HadoopV2Splitter.splitJob(jobCtx); + else + return HadoopV1Splitter.splitJob(jobConf); + } + + Path jobDir = new Path(jobDirPath); + + try (FileSystem fs = FileSystem.get(jobDir.toUri(), jobConf)) { + JobSplit.TaskSplitMetaInfo[] metaInfos = SplitMetaInfoReader.readSplitMetaInfo(hadoopJobID, fs, jobConf, + jobDir); + + if (F.isEmpty(metaInfos)) + throw new IgniteCheckedException("No input splits found."); + + Path splitsFile = JobSubmissionFiles.getJobSplitFile(jobDir); + + try (FSDataInputStream in = fs.open(splitsFile)) { + Collection<HadoopInputSplit> res = new ArrayList<>(metaInfos.length); + + for (JobSplit.TaskSplitMetaInfo metaInfo : metaInfos) { + long off = metaInfo.getStartOffset(); + + String[] hosts = metaInfo.getLocations(); + + in.seek(off); + + String clsName = Text.readString(in); + + HadoopFileBlock block = HadoopV1Splitter.readFileBlock(clsName, in, hosts); + + if (block == null) + block = HadoopV2Splitter.readFileBlock(clsName, in, hosts); + + res.add(block != null ? block : new HadoopExternalSplit(hosts, off)); + } + + return res; + } + } + catch (Throwable e) { + throw transformException(e); + } + } + finally { + Thread.currentThread().setContextClassLoader(null); + } + } + + /** {@inheritDoc} */ + @Override public HadoopTaskContext getTaskContext(HadoopTaskInfo info) throws IgniteCheckedException { + T2<HadoopTaskType, Integer> locTaskId = new T2<>(info.type(), info.taskNumber()); + + GridFutureAdapter<HadoopTaskContext> fut = ctxs.get(locTaskId); + + if (fut != null) + return fut.get(); + + GridFutureAdapter<HadoopTaskContext> old = ctxs.putIfAbsent(locTaskId, fut = new GridFutureAdapter<>()); + + if (old != null) + return old.get(); + + Class<?> cls = taskCtxClsPool.poll(); + + try { + if (cls == null) { + // If there is no pooled class, then load new one. + HadoopClassLoader ldr = new HadoopClassLoader(rsrcMgr.classPath()); + + cls = ldr.loadClass(HadoopV2TaskContext.class.getName()); + } + + Constructor<?> ctr = cls.getConstructor(HadoopTaskInfo.class, HadoopJob.class, + HadoopJobId.class, UUID.class, DataInput.class); + + if (jobConfData == null) + synchronized(jobConf) { + if (jobConfData == null) { + ByteArrayOutputStream buf = new ByteArrayOutputStream(); + + jobConf.write(new DataOutputStream(buf)); + + jobConfData = buf.toByteArray(); + } + } + + HadoopTaskContext res = (HadoopTaskContext)ctr.newInstance(info, this, jobId, locNodeId, + new DataInputStream(new ByteArrayInputStream(jobConfData))); + + fut.onDone(res); + + return res; + } + catch (Throwable e) { + IgniteCheckedException te = transformException(e); + + fut.onDone(te); + + throw te; + } + } + + /** {@inheritDoc} */ + @Override public void initialize(boolean external, UUID locNodeId) throws IgniteCheckedException { + this.locNodeId = locNodeId; + + Thread.currentThread().setContextClassLoader(jobConf.getClassLoader()); + + try { + rsrcMgr.prepareJobEnvironment(!external, jobLocalDir(locNodeId, jobId)); + } + finally { + Thread.currentThread().setContextClassLoader(null); + } + } + + /** {@inheritDoc} */ + @Override public void dispose(boolean external) throws IgniteCheckedException { + if (rsrcMgr != null && !external) { + File jobLocDir = jobLocalDir(locNodeId, jobId); + + if (jobLocDir.exists()) + U.delete(jobLocDir); + } + } + + /** {@inheritDoc} */ + @Override public void prepareTaskEnvironment(HadoopTaskInfo info) throws IgniteCheckedException { + rsrcMgr.prepareTaskWorkDir(taskLocalDir(locNodeId, info)); + } + + /** {@inheritDoc} */ + @Override public void cleanupTaskEnvironment(HadoopTaskInfo info) throws IgniteCheckedException { + HadoopTaskContext ctx = ctxs.remove(new T2<>(info.type(), info.taskNumber())).get(); + + taskCtxClsPool.offer(ctx.getClass()); + + File locDir = taskLocalDir(locNodeId, info); + + if (locDir.exists()) + U.delete(locDir); + } + + /** {@inheritDoc} */ + @Override public void cleanupStagingDirectory() { + if (rsrcMgr != null) + rsrcMgr.cleanupStagingDirectory(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java new file mode 100644 index 0000000..6f6bfa1 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java @@ -0,0 +1,305 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.v2; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.*; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.*; +import org.apache.hadoop.mapreduce.*; +import org.apache.hadoop.util.*; +import org.apache.ignite.*; +import org.apache.ignite.internal.processors.hadoop.*; +import org.apache.ignite.internal.processors.hadoop.fs.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.net.*; +import java.nio.file.*; +import java.util.*; + +/** + * Provides all resources are needed to the job execution. Downloads the main jar, the configuration and additional + * files are needed to be placed on local files system. + */ +public class HadoopV2JobResourceManager { + /** Hadoop job context. */ + private final JobContextImpl ctx; + + /** Logger. */ + private final IgniteLogger log; + + /** Job ID. */ + private final HadoopJobId jobId; + + /** Class path list. */ + private URL[] clsPath; + + /** Set of local resources. */ + private final Collection<File> rsrcSet = new HashSet<>(); + + /** Staging directory to delivery job jar and config to the work nodes. */ + private Path stagingDir; + + /** + * Creates new instance. + * @param jobId Job ID. + * @param ctx Hadoop job context. + * @param log Logger. + */ + public HadoopV2JobResourceManager(HadoopJobId jobId, JobContextImpl ctx, IgniteLogger log) { + this.jobId = jobId; + this.ctx = ctx; + this.log = log.getLogger(HadoopV2JobResourceManager.class); + } + + /** + * Set working directory in local file system. + * + * @param dir Working directory. + * @throws IOException If fails. + */ + private void setLocalFSWorkingDirectory(File dir) throws IOException { + JobConf cfg = ctx.getJobConf(); + + Thread.currentThread().setContextClassLoader(cfg.getClassLoader()); + + try { + cfg.set(HadoopFileSystemsUtils.LOC_FS_WORK_DIR_PROP, dir.getAbsolutePath()); + + if(!cfg.getBoolean("fs.file.impl.disable.cache", false)) + FileSystem.getLocal(cfg).setWorkingDirectory(new Path(dir.getAbsolutePath())); + } + finally { + Thread.currentThread().setContextClassLoader(null); + } + } + + /** + * Prepare job resources. Resolve the classpath list and download it if needed. + * + * @param download {@code true} If need to download resources. + * @param jobLocDir Work directory for the job. + * @throws IgniteCheckedException If failed. + */ + public void prepareJobEnvironment(boolean download, File jobLocDir) throws IgniteCheckedException { + try { + if (jobLocDir.exists()) + throw new IgniteCheckedException("Local job directory already exists: " + jobLocDir.getAbsolutePath()); + + JobConf cfg = ctx.getJobConf(); + + String mrDir = cfg.get("mapreduce.job.dir"); + + if (mrDir != null) { + stagingDir = new Path(new URI(mrDir)); + + if (download) { + FileSystem fs = FileSystem.get(stagingDir.toUri(), cfg); + + if (!fs.exists(stagingDir)) + throw new IgniteCheckedException("Failed to find map-reduce submission directory (does not exist): " + + stagingDir); + + if (!FileUtil.copy(fs, stagingDir, jobLocDir, false, cfg)) + throw new IgniteCheckedException("Failed to copy job submission directory contents to local file system " + + "[path=" + stagingDir + ", locDir=" + jobLocDir.getAbsolutePath() + ", jobId=" + jobId + ']'); + } + + File jarJobFile = new File(jobLocDir, "job.jar"); + + Collection<URL> clsPathUrls = new ArrayList<>(); + + clsPathUrls.add(jarJobFile.toURI().toURL()); + + rsrcSet.add(jarJobFile); + rsrcSet.add(new File(jobLocDir, "job.xml")); + + processFiles(jobLocDir, ctx.getCacheFiles(), download, false, null, MRJobConfig.CACHE_LOCALFILES); + processFiles(jobLocDir, ctx.getCacheArchives(), download, true, null, MRJobConfig.CACHE_LOCALARCHIVES); + processFiles(jobLocDir, ctx.getFileClassPaths(), download, false, clsPathUrls, null); + processFiles(jobLocDir, ctx.getArchiveClassPaths(), download, true, clsPathUrls, null); + + if (!clsPathUrls.isEmpty()) { + clsPath = new URL[clsPathUrls.size()]; + + clsPathUrls.toArray(clsPath); + } + } + else if (!jobLocDir.mkdirs()) + throw new IgniteCheckedException("Failed to create local job directory: " + jobLocDir.getAbsolutePath()); + + setLocalFSWorkingDirectory(jobLocDir); + } + catch (URISyntaxException | IOException e) { + throw new IgniteCheckedException(e); + } + } + + /** + * Process list of resources. + * + * @param jobLocDir Job working directory. + * @param files Array of {@link java.net.URI} or {@link org.apache.hadoop.fs.Path} to process resources. + * @param download {@code true}, if need to download. Process class path only else. + * @param extract {@code true}, if need to extract archive. + * @param clsPathUrls Collection to add resource as classpath resource. + * @param rsrcNameProp Property for resource name array setting. + * @throws IOException If failed. + */ + private void processFiles(File jobLocDir, @Nullable Object[] files, boolean download, boolean extract, + @Nullable Collection<URL> clsPathUrls, @Nullable String rsrcNameProp) throws IOException { + if (F.isEmptyOrNulls(files)) + return; + + Collection<String> res = new ArrayList<>(); + + for (Object pathObj : files) { + String locName = null; + Path srcPath; + + if (pathObj instanceof URI) { + URI uri = (URI)pathObj; + + locName = uri.getFragment(); + + srcPath = new Path(uri); + } + else + srcPath = (Path)pathObj; + + if (locName == null) + locName = srcPath.getName(); + + File dstPath = new File(jobLocDir.getAbsolutePath(), locName); + + res.add(locName); + + rsrcSet.add(dstPath); + + if (clsPathUrls != null) + clsPathUrls.add(dstPath.toURI().toURL()); + + if (!download) + continue; + + JobConf cfg = ctx.getJobConf(); + + FileSystem dstFs = FileSystem.getLocal(cfg); + + FileSystem srcFs = srcPath.getFileSystem(cfg); + + if (extract) { + File archivesPath = new File(jobLocDir.getAbsolutePath(), ".cached-archives"); + + if (!archivesPath.exists() && !archivesPath.mkdir()) + throw new IOException("Failed to create directory " + + "[path=" + archivesPath + ", jobId=" + jobId + ']'); + + File archiveFile = new File(archivesPath, locName); + + FileUtil.copy(srcFs, srcPath, dstFs, new Path(archiveFile.toString()), false, cfg); + + String archiveNameLC = archiveFile.getName().toLowerCase(); + + if (archiveNameLC.endsWith(".jar")) + RunJar.unJar(archiveFile, dstPath); + else if (archiveNameLC.endsWith(".zip")) + FileUtil.unZip(archiveFile, dstPath); + else if (archiveNameLC.endsWith(".tar.gz") || + archiveNameLC.endsWith(".tgz") || + archiveNameLC.endsWith(".tar")) + FileUtil.unTar(archiveFile, dstPath); + else + throw new IOException("Cannot unpack archive [path=" + srcPath + ", jobId=" + jobId + ']'); + } + else + FileUtil.copy(srcFs, srcPath, dstFs, new Path(dstPath.toString()), false, cfg); + } + + if (!res.isEmpty() && rsrcNameProp != null) + ctx.getJobConf().setStrings(rsrcNameProp, res.toArray(new String[res.size()])); + } + + /** + * Prepares working directory for the task. + * + * <ul> + * <li>Creates working directory.</li> + * <li>Creates symbolic links to all job resources in working directory.</li> + * </ul> + * + * @param path Path to working directory of the task. + * @throws IgniteCheckedException If fails. + */ + public void prepareTaskWorkDir(File path) throws IgniteCheckedException { + try { + if (path.exists()) + throw new IOException("Task local directory already exists: " + path); + + if (!path.mkdir()) + throw new IOException("Failed to create directory: " + path); + + for (File resource : rsrcSet) { + File symLink = new File(path, resource.getName()); + + try { + Files.createSymbolicLink(symLink.toPath(), resource.toPath()); + } + catch (IOException e) { + String msg = "Unable to create symlink \"" + symLink + "\" to \"" + resource + "\"."; + + if (U.isWindows() && e instanceof FileSystemException) + msg += "\n\nAbility to create symbolic links is required!\n" + + "On Windows platform you have to grant permission 'Create symbolic links'\n" + + "to your user or run the Accelerator as Administrator.\n"; + + throw new IOException(msg, e); + } + } + } + catch (IOException e) { + throw new IgniteCheckedException("Unable to prepare local working directory for the task " + + "[jobId=" + jobId + ", path=" + path+ ']', e); + } + } + + /** + * Cleans up job staging directory. + */ + public void cleanupStagingDirectory() { + try { + if (stagingDir != null) + stagingDir.getFileSystem(ctx.getJobConf()).delete(stagingDir, true); + } + catch (Exception e) { + log.error("Failed to remove job staging directory [path=" + stagingDir + ", jobId=" + jobId + ']' , e); + } + } + + /** + * Returns array of class path for current job. + * + * @return Class path collection. + */ + @Nullable public URL[] classPath() { + return clsPath; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2MapTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2MapTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2MapTask.java new file mode 100644 index 0000000..2bf4292 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2MapTask.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.v2; + +import org.apache.hadoop.fs.*; +import org.apache.hadoop.mapred.*; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.hadoop.mapreduce.lib.map.*; +import org.apache.hadoop.util.*; +import org.apache.ignite.*; +import org.apache.ignite.internal.processors.hadoop.*; +import org.apache.ignite.internal.*; + +/** + * Hadoop map task implementation for v2 API. + */ +public class HadoopV2MapTask extends HadoopV2Task { + /** + * @param taskInfo Task info. + */ + public HadoopV2MapTask(HadoopTaskInfo taskInfo) { + super(taskInfo); + } + + /** {@inheritDoc} */ + @SuppressWarnings({"ConstantConditions", "unchecked"}) + @Override public void run0(HadoopV2TaskContext taskCtx) throws IgniteCheckedException { + HadoopInputSplit split = info().inputSplit(); + + InputSplit nativeSplit; + + if (split instanceof HadoopFileBlock) { + HadoopFileBlock block = (HadoopFileBlock)split; + + nativeSplit = new FileSplit(new Path(block.file().toString()), block.start(), block.length(), null); + } + else + nativeSplit = (InputSplit)taskCtx.getNativeSplit(split); + + assert nativeSplit != null; + + OutputFormat outputFormat = null; + Exception err = null; + + JobContextImpl jobCtx = taskCtx.jobContext(); + + try { + InputFormat inFormat = ReflectionUtils.newInstance(jobCtx.getInputFormatClass(), + hadoopContext().getConfiguration()); + + RecordReader reader = inFormat.createRecordReader(nativeSplit, hadoopContext()); + + reader.initialize(nativeSplit, hadoopContext()); + + hadoopContext().reader(reader); + + HadoopJobInfo jobInfo = taskCtx.job().info(); + + outputFormat = jobInfo.hasCombiner() || jobInfo.hasReducer() ? null : prepareWriter(jobCtx); + + Mapper mapper = ReflectionUtils.newInstance(jobCtx.getMapperClass(), hadoopContext().getConfiguration()); + + try { + mapper.run(new WrappedMapper().getMapContext(hadoopContext())); + } + finally { + closeWriter(); + } + + commit(outputFormat); + } + catch (InterruptedException e) { + err = e; + + Thread.currentThread().interrupt(); + + throw new IgniteInterruptedCheckedException(e); + } + catch (Exception e) { + err = e; + + throw new IgniteCheckedException(e); + } + finally { + if (err != null) + abort(outputFormat); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Partitioner.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Partitioner.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Partitioner.java new file mode 100644 index 0000000..36382d4 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Partitioner.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.v2; + +import org.apache.hadoop.conf.*; +import org.apache.hadoop.mapreduce.*; +import org.apache.hadoop.util.*; +import org.apache.ignite.internal.processors.hadoop.*; + +/** + * Hadoop partitioner adapter for v2 API. + */ +public class HadoopV2Partitioner implements HadoopPartitioner { + /** Partitioner instance. */ + private Partitioner<Object, Object> part; + + /** + * @param cls Hadoop partitioner class. + * @param conf Job configuration. + */ + public HadoopV2Partitioner(Class<? extends Partitioner<?, ?>> cls, Configuration conf) { + part = (Partitioner<Object, Object>) ReflectionUtils.newInstance(cls, conf); + } + + /** {@inheritDoc} */ + @Override public int partition(Object key, Object val, int parts) { + return part.getPartition(key, val, parts); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2ReduceTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2ReduceTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2ReduceTask.java new file mode 100644 index 0000000..250c41b --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2ReduceTask.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.v2; + +import org.apache.hadoop.mapred.*; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.lib.reduce.*; +import org.apache.hadoop.util.*; +import org.apache.ignite.*; +import org.apache.ignite.internal.processors.hadoop.*; +import org.apache.ignite.internal.*; + +/** + * Hadoop reduce task implementation for v2 API. + */ +public class HadoopV2ReduceTask extends HadoopV2Task { + /** {@code True} if reduce, {@code false} if combine. */ + private final boolean reduce; + + /** + * Constructor. + * + * @param taskInfo Task info. + * @param reduce {@code True} if reduce, {@code false} if combine. + */ + public HadoopV2ReduceTask(HadoopTaskInfo taskInfo, boolean reduce) { + super(taskInfo); + + this.reduce = reduce; + } + + /** {@inheritDoc} */ + @SuppressWarnings({"ConstantConditions", "unchecked"}) + @Override public void run0(HadoopV2TaskContext taskCtx) throws IgniteCheckedException { + OutputFormat outputFormat = null; + Exception err = null; + + JobContextImpl jobCtx = taskCtx.jobContext(); + + try { + outputFormat = reduce || !taskCtx.job().info().hasReducer() ? prepareWriter(jobCtx) : null; + + Reducer reducer = ReflectionUtils.newInstance(reduce ? jobCtx.getReducerClass() : jobCtx.getCombinerClass(), + jobCtx.getConfiguration()); + + try { + reducer.run(new WrappedReducer().getReducerContext(hadoopContext())); + } + finally { + closeWriter(); + } + + commit(outputFormat); + } + catch (InterruptedException e) { + err = e; + + Thread.currentThread().interrupt(); + + throw new IgniteInterruptedCheckedException(e); + } + catch (Exception e) { + err = e; + + throw new IgniteCheckedException(e); + } + finally { + if (err != null) + abort(outputFormat); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2SetupTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2SetupTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2SetupTask.java new file mode 100644 index 0000000..81587c1 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2SetupTask.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.v2; + +import org.apache.hadoop.mapred.*; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.ignite.*; +import org.apache.ignite.internal.processors.hadoop.*; +import org.apache.ignite.internal.*; + +import java.io.*; + +/** + * Hadoop setup task (prepares job). + */ +public class HadoopV2SetupTask extends HadoopV2Task { + /** + * Constructor. + * + * @param taskInfo task info. + */ + public HadoopV2SetupTask(HadoopTaskInfo taskInfo) { + super(taskInfo); + } + + /** {@inheritDoc} */ + @SuppressWarnings("ConstantConditions") + @Override protected void run0(HadoopV2TaskContext taskCtx) throws IgniteCheckedException { + try { + JobContextImpl jobCtx = taskCtx.jobContext(); + + OutputFormat outputFormat = getOutputFormat(jobCtx); + + outputFormat.checkOutputSpecs(jobCtx); + + OutputCommitter committer = outputFormat.getOutputCommitter(hadoopContext()); + + if (committer != null) + committer.setupJob(jobCtx); + } + catch (ClassNotFoundException | IOException e) { + throw new IgniteCheckedException(e); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + + throw new IgniteInterruptedCheckedException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Splitter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Splitter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Splitter.java new file mode 100644 index 0000000..76a3329 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Splitter.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.v2; + +import org.apache.hadoop.mapreduce.*; +import org.apache.hadoop.mapreduce.lib.input.*; +import org.apache.hadoop.util.*; +import org.apache.ignite.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.hadoop.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.util.*; + +/** + * Hadoop API v2 splitter. + */ +public class HadoopV2Splitter { + /** */ + private static final String[] EMPTY_HOSTS = {}; + + /** + * @param ctx Job context. + * @return Collection of mapped splits. + * @throws IgniteCheckedException If mapping failed. + */ + public static Collection<HadoopInputSplit> splitJob(JobContext ctx) throws IgniteCheckedException { + try { + InputFormat<?, ?> format = ReflectionUtils.newInstance(ctx.getInputFormatClass(), ctx.getConfiguration()); + + assert format != null; + + List<InputSplit> splits = format.getSplits(ctx); + + Collection<HadoopInputSplit> res = new ArrayList<>(splits.size()); + + int id = 0; + + for (InputSplit nativeSplit : splits) { + if (nativeSplit instanceof FileSplit) { + FileSplit s = (FileSplit)nativeSplit; + + res.add(new HadoopFileBlock(s.getLocations(), s.getPath().toUri(), s.getStart(), s.getLength())); + } + else + res.add(HadoopUtils.wrapSplit(id, nativeSplit, nativeSplit.getLocations())); + + id++; + } + + return res; + } + catch (IOException | ClassNotFoundException e) { + throw new IgniteCheckedException(e); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + + throw new IgniteInterruptedCheckedException(e); + } + } + + /** + * @param clsName Input split class name. + * @param in Input stream. + * @param hosts Optional hosts. + * @return File block or {@code null} if it is not a {@link FileSplit} instance. + * @throws IgniteCheckedException If failed. + */ + public static HadoopFileBlock readFileBlock(String clsName, DataInput in, @Nullable String[] hosts) + throws IgniteCheckedException { + if (!FileSplit.class.getName().equals(clsName)) + return null; + + FileSplit split = new FileSplit(); + + try { + split.readFields(in); + } + catch (IOException e) { + throw new IgniteCheckedException(e); + } + + if (hosts == null) + hosts = EMPTY_HOSTS; + + return new HadoopFileBlock(hosts, split.getPath().toUri(), split.getStart(), split.getLength()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Task.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Task.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Task.java new file mode 100644 index 0000000..5ade3fb --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Task.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.v2; + +import org.apache.hadoop.mapreduce.*; +import org.apache.hadoop.util.*; +import org.apache.ignite.*; +import org.apache.ignite.internal.processors.hadoop.*; +import org.jetbrains.annotations.*; + +import java.io.*; + +/** + * Extended Hadoop v2 task. + */ +public abstract class HadoopV2Task extends HadoopTask { + /** Hadoop context. */ + private HadoopV2Context hadoopCtx; + + /** + * Constructor. + * + * @param taskInfo Task info. + */ + protected HadoopV2Task(HadoopTaskInfo taskInfo) { + super(taskInfo); + } + + /** {@inheritDoc} */ + @Override public void run(HadoopTaskContext taskCtx) throws IgniteCheckedException { + HadoopV2TaskContext ctx = (HadoopV2TaskContext)taskCtx; + + hadoopCtx = new HadoopV2Context(ctx); + + run0(ctx); + } + + /** + * Internal task routine. + * + * @param taskCtx Task context. + * @throws IgniteCheckedException + */ + protected abstract void run0(HadoopV2TaskContext taskCtx) throws IgniteCheckedException; + + /** + * @return hadoop context. + */ + protected HadoopV2Context hadoopContext() { + return hadoopCtx; + } + + /** + * Create and configure an OutputFormat instance. + * + * @param jobCtx Job context. + * @return Instance of OutputFormat is specified in job configuration. + * @throws ClassNotFoundException If specified class not found. + */ + protected OutputFormat getOutputFormat(JobContext jobCtx) throws ClassNotFoundException { + return ReflectionUtils.newInstance(jobCtx.getOutputFormatClass(), hadoopContext().getConfiguration()); + } + + /** + * Put write into Hadoop context and return associated output format instance. + * + * @param jobCtx Job context. + * @return Output format. + * @throws IgniteCheckedException In case of Grid exception. + * @throws InterruptedException In case of interrupt. + */ + protected OutputFormat prepareWriter(JobContext jobCtx) + throws IgniteCheckedException, InterruptedException { + try { + OutputFormat outputFormat = getOutputFormat(jobCtx); + + assert outputFormat != null; + + OutputCommitter outCommitter = outputFormat.getOutputCommitter(hadoopCtx); + + if (outCommitter != null) + outCommitter.setupTask(hadoopCtx); + + RecordWriter writer = outputFormat.getRecordWriter(hadoopCtx); + + hadoopCtx.writer(writer); + + return outputFormat; + } + catch (IOException | ClassNotFoundException e) { + throw new IgniteCheckedException(e); + } + } + + /** + * Closes writer. + * + * @throws Exception If fails and logger hasn't been specified. + */ + protected void closeWriter() throws Exception { + RecordWriter writer = hadoopCtx.writer(); + + if (writer != null) + writer.close(hadoopCtx); + } + + /** + * Setup task. + * + * @param outputFormat Output format. + * @throws IOException In case of IO exception. + * @throws InterruptedException In case of interrupt. + */ + protected void setup(@Nullable OutputFormat outputFormat) throws IOException, InterruptedException { + if (hadoopCtx.writer() != null) { + assert outputFormat != null; + + outputFormat.getOutputCommitter(hadoopCtx).setupTask(hadoopCtx); + } + } + + /** + * Commit task. + * + * @param outputFormat Output format. + * @throws IgniteCheckedException In case of Grid exception. + * @throws IOException In case of IO exception. + * @throws InterruptedException In case of interrupt. + */ + protected void commit(@Nullable OutputFormat outputFormat) throws IgniteCheckedException, IOException, InterruptedException { + if (hadoopCtx.writer() != null) { + assert outputFormat != null; + + OutputCommitter outputCommitter = outputFormat.getOutputCommitter(hadoopCtx); + + if (outputCommitter.needsTaskCommit(hadoopCtx)) + outputCommitter.commitTask(hadoopCtx); + } + } + + /** + * Abort task. + * + * @param outputFormat Output format. + */ + protected void abort(@Nullable OutputFormat outputFormat) { + if (hadoopCtx.writer() != null) { + assert outputFormat != null; + + try { + outputFormat.getOutputCommitter(hadoopCtx).abortTask(hadoopCtx); + } + catch (IOException ignore) { + // Ignore. + } + catch (InterruptedException ignore) { + Thread.currentThread().interrupt(); + } + } + } + + /** {@inheritDoc} */ + @Override public void cancel() { + hadoopCtx.cancel(); + } +}