http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopShutdownHookManager.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopShutdownHookManager.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopShutdownHookManager.java deleted file mode 100644 index 48558fc..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopShutdownHookManager.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.v2; - -import java.util.*; -import java.util.concurrent.atomic.*; - -/** - * Fake manager for shutdown hooks. - */ -public class GridHadoopShutdownHookManager { - /** */ - private static final GridHadoopShutdownHookManager MGR = new GridHadoopShutdownHookManager(); - - /** - * Return <code>ShutdownHookManager</code> singleton. - * - * @return <code>ShutdownHookManager</code> singleton. - */ - public static GridHadoopShutdownHookManager get() { - return MGR; - } - - /** */ - private Set<Runnable> hooks = Collections.synchronizedSet(new HashSet<Runnable>()); - - /** */ - private AtomicBoolean shutdownInProgress = new AtomicBoolean(false); - - /** - * Singleton. - */ - private GridHadoopShutdownHookManager() { - // No-op. - } - - /** - * Adds a shutdownHook with a priority, the higher the priority - * the earlier will run. ShutdownHooks with same priority run - * in a non-deterministic order. - * - * @param shutdownHook shutdownHook <code>Runnable</code> - * @param priority priority of the shutdownHook. - */ - public void addShutdownHook(Runnable shutdownHook, int priority) { - if (shutdownHook == null) - throw new IllegalArgumentException("shutdownHook cannot be NULL"); - - hooks.add(shutdownHook); - } - - /** - * Removes a shutdownHook. - * - * @param shutdownHook shutdownHook to remove. - * @return TRUE if the shutdownHook was registered and removed, - * FALSE otherwise. - */ - public boolean removeShutdownHook(Runnable shutdownHook) { - return hooks.remove(shutdownHook); - } - - /** - * Indicates if a shutdownHook is registered or not. - * - * @param shutdownHook shutdownHook to check if registered. - * @return TRUE/FALSE depending if the shutdownHook is is registered. - */ - public boolean hasShutdownHook(Runnable shutdownHook) { - return hooks.contains(shutdownHook); - } - - /** - * Indicates if shutdown is in progress or not. - * - * @return TRUE if the shutdown is in progress, otherwise FALSE. - */ - public boolean isShutdownInProgress() { - return shutdownInProgress.get(); - } -}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopSplitWrapper.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopSplitWrapper.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopSplitWrapper.java deleted file mode 100644 index 57edfa9..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopSplitWrapper.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.v2; - -import org.apache.ignite.internal.processors.hadoop.*; -import org.apache.ignite.internal.util.typedef.internal.*; - -import java.io.*; - -/** - * The wrapper for native hadoop input splits. - * - * Warning!! This class must not depend on any Hadoop classes directly or indirectly. - */ -public class GridHadoopSplitWrapper extends GridHadoopInputSplit { - /** */ - private static final long serialVersionUID = 0L; - - /** Native hadoop input split. */ - private byte[] bytes; - - /** */ - private String clsName; - - /** Internal ID */ - private int id; - - /** - * Creates new split wrapper. - */ - public GridHadoopSplitWrapper() { - // No-op. - } - - /** - * Creates new split wrapper. - * - * @param id Split ID. - * @param clsName Class name. - * @param bytes Serialized class. - * @param hosts Hosts where split is located. - */ - public GridHadoopSplitWrapper(int id, String clsName, byte[] bytes, String[] hosts) { - assert hosts != null; - assert clsName != null; - assert bytes != null; - - this.hosts = hosts; - this.id = id; - - this.clsName = clsName; - this.bytes = bytes; - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeInt(id); - - out.writeUTF(clsName); - U.writeByteArray(out, bytes); - } - - /** - * @return Class name. - */ - public String className() { - return clsName; - } - - /** - * @return Class bytes. - */ - public byte[] bytes() { - return bytes; - } - - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - id = in.readInt(); - - clsName = in.readUTF(); - bytes = U.readByteArray(in); - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object o) { - if (this == o) - return true; - - if (o == null || getClass() != o.getClass()) - return false; - - GridHadoopSplitWrapper that = (GridHadoopSplitWrapper)o; - - return id == that.id; - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - return id; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2CleanupTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2CleanupTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2CleanupTask.java deleted file mode 100644 index 38be3da..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2CleanupTask.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.v2; - -import org.apache.hadoop.mapred.*; -import org.apache.hadoop.mapreduce.JobStatus; -import org.apache.hadoop.mapreduce.OutputCommitter; -import org.apache.hadoop.mapreduce.OutputFormat; -import org.apache.ignite.*; -import org.apache.ignite.internal.processors.hadoop.*; -import org.apache.ignite.internal.*; - -import java.io.*; - -/** - * Hadoop cleanup task (commits or aborts job). - */ -public class GridHadoopV2CleanupTask extends GridHadoopV2Task { - /** Abort flag. */ - private final boolean abort; - - /** - * @param taskInfo Task info. - * @param abort Abort flag. - */ - public GridHadoopV2CleanupTask(GridHadoopTaskInfo taskInfo, boolean abort) { - super(taskInfo); - - this.abort = abort; - } - - /** {@inheritDoc} */ - @SuppressWarnings("ConstantConditions") - @Override public void run0(GridHadoopV2TaskContext taskCtx) throws IgniteCheckedException { - JobContextImpl jobCtx = taskCtx.jobContext(); - - try { - OutputFormat outputFormat = getOutputFormat(jobCtx); - - OutputCommitter committer = outputFormat.getOutputCommitter(hadoopContext()); - - if (committer != null) { - if (abort) - committer.abortJob(jobCtx, JobStatus.State.FAILED); - else - committer.commitJob(jobCtx); - } - } - catch (ClassNotFoundException | IOException e) { - throw new IgniteCheckedException(e); - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - - throw new IgniteInterruptedCheckedException(e); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2Context.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2Context.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2Context.java deleted file mode 100644 index 287b10f..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2Context.java +++ /dev/null @@ -1,230 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.v2; - -import org.apache.hadoop.fs.*; -import org.apache.hadoop.mapreduce.*; -import org.apache.hadoop.mapreduce.lib.input.*; -import org.apache.hadoop.mapreduce.task.*; -import org.apache.ignite.*; -import org.apache.ignite.internal.processors.hadoop.*; -import org.apache.ignite.internal.processors.hadoop.counter.*; - -import java.io.*; -import java.util.*; - -/** - * Hadoop context implementation for v2 API. It provides IO operations for hadoop tasks. - */ -public class GridHadoopV2Context extends JobContextImpl implements MapContext, ReduceContext { - /** Input reader to overriding of GridHadoopTaskContext input. */ - private RecordReader reader; - - /** Output writer to overriding of GridHadoopTaskContext output. */ - private RecordWriter writer; - - /** Output is provided by executor environment. */ - private final GridHadoopTaskOutput output; - - /** Input is provided by executor environment. */ - private final GridHadoopTaskInput input; - - /** Unique identifier for a task attempt. */ - private final TaskAttemptID taskAttemptID; - - /** Indicates that this task is to be cancelled. */ - private volatile boolean cancelled; - - /** Input split. */ - private InputSplit inputSplit; - - /** */ - private final GridHadoopTaskContext ctx; - - /** */ - private String status; - - /** - * @param ctx Context for IO operations. - */ - public GridHadoopV2Context(GridHadoopV2TaskContext ctx) { - super(ctx.jobConf(), ctx.jobContext().getJobID()); - - taskAttemptID = ctx.attemptId(); - - conf.set("mapreduce.job.id", taskAttemptID.getJobID().toString()); - conf.set("mapreduce.task.id", taskAttemptID.getTaskID().toString()); - - output = ctx.output(); - input = ctx.input(); - - this.ctx = ctx; - } - - /** {@inheritDoc} */ - @Override public InputSplit getInputSplit() { - if (inputSplit == null) { - GridHadoopInputSplit split = ctx.taskInfo().inputSplit(); - - if (split == null) - return null; - - if (split instanceof GridHadoopFileBlock) { - GridHadoopFileBlock fileBlock = (GridHadoopFileBlock)split; - - inputSplit = new FileSplit(new Path(fileBlock.file()), fileBlock.start(), fileBlock.length(), null); - } - else if (split instanceof GridHadoopExternalSplit) - throw new UnsupportedOperationException(); // TODO - else if (split instanceof GridHadoopSplitWrapper) - inputSplit = (InputSplit)GridHadoopUtils.unwrapSplit((GridHadoopSplitWrapper)split); - else - throw new IllegalStateException(); - } - - return inputSplit; - } - - /** {@inheritDoc} */ - @Override public boolean nextKeyValue() throws IOException, InterruptedException { - if (cancelled) - throw new GridHadoopTaskCancelledException("Task cancelled."); - - return reader.nextKeyValue(); - } - - /** {@inheritDoc} */ - @Override public Object getCurrentKey() throws IOException, InterruptedException { - if (reader != null) - return reader.getCurrentKey(); - - return input.key(); - } - - /** {@inheritDoc} */ - @Override public Object getCurrentValue() throws IOException, InterruptedException { - return reader.getCurrentValue(); - } - - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override public void write(Object key, Object val) throws IOException, InterruptedException { - if (cancelled) - throw new GridHadoopTaskCancelledException("Task cancelled."); - - if (writer != null) - writer.write(key, val); - else { - try { - output.write(key, val); - } - catch (IgniteCheckedException e) { - throw new IOException(e); - } - } - } - - /** {@inheritDoc} */ - @Override public OutputCommitter getOutputCommitter() { - throw new UnsupportedOperationException(); - } - - /** {@inheritDoc} */ - @Override public TaskAttemptID getTaskAttemptID() { - return taskAttemptID; - } - - /** {@inheritDoc} */ - @Override public void setStatus(String msg) { - status = msg; - } - - /** {@inheritDoc} */ - @Override public String getStatus() { - return status; - } - - /** {@inheritDoc} */ - @Override public float getProgress() { - return 0.5f; // TODO - } - - /** {@inheritDoc} */ - @Override public Counter getCounter(Enum<?> cntrName) { - return getCounter(cntrName.getDeclaringClass().getName(), cntrName.name()); - } - - /** {@inheritDoc} */ - @Override public Counter getCounter(String grpName, String cntrName) { - return new GridHadoopV2Counter(ctx.counter(grpName, cntrName, GridHadoopLongCounter.class)); - } - - /** {@inheritDoc} */ - @Override public void progress() { - // No-op. - } - - /** - * Overrides default input data reader. - * - * @param reader New reader. - */ - public void reader(RecordReader reader) { - this.reader = reader; - } - - /** {@inheritDoc} */ - @Override public boolean nextKey() throws IOException, InterruptedException { - if (cancelled) - throw new GridHadoopTaskCancelledException("Task cancelled."); - - return input.next(); - } - - /** {@inheritDoc} */ - @Override public Iterable getValues() throws IOException, InterruptedException { - return new Iterable() { - @Override public Iterator iterator() { - return input.values(); - } - }; - } - - /** - * @return Overridden output data writer. - */ - public RecordWriter writer() { - return writer; - } - - /** - * Overrides default output data writer. - * - * @param writer New writer. - */ - public void writer(RecordWriter writer) { - this.writer = writer; - } - - /** - * Cancels the task by stop the IO. - */ - public void cancel() { - cancelled = true; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2Counter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2Counter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2Counter.java deleted file mode 100644 index 6bf8a44..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2Counter.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.v2; - -import org.apache.hadoop.mapreduce.*; -import org.apache.ignite.internal.processors.hadoop.counter.*; - -import java.io.*; - -/** - * Adapter from own counter implementation into Hadoop API Counter od version 2.0. - */ -public class GridHadoopV2Counter implements Counter { - /** Delegate. */ - private final GridHadoopLongCounter cntr; - - /** - * Creates new instance with given delegate. - * - * @param cntr Internal counter. - */ - public GridHadoopV2Counter(GridHadoopLongCounter cntr) { - assert cntr != null : "counter must be non-null"; - - this.cntr = cntr; - } - - /** {@inheritDoc} */ - @Override public void setDisplayName(String displayName) { - // No-op. - } - - /** {@inheritDoc} */ - @Override public String getName() { - return cntr.name(); - } - - /** {@inheritDoc} */ - @Override public String getDisplayName() { - return getName(); - } - - /** {@inheritDoc} */ - @Override public long getValue() { - return cntr.value(); - } - - /** {@inheritDoc} */ - @Override public void setValue(long val) { - cntr.value(val); - } - - /** {@inheritDoc} */ - @Override public void increment(long incr) { - cntr.increment(incr); - } - - /** {@inheritDoc} */ - @Override public Counter getUnderlyingCounter() { - return this; - } - - /** {@inheritDoc} */ - @Override public void write(DataOutput out) throws IOException { - throw new UnsupportedOperationException("not implemented"); - } - - /** {@inheritDoc} */ - @Override public void readFields(DataInput in) throws IOException { - throw new UnsupportedOperationException("not implemented"); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2Job.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2Job.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2Job.java deleted file mode 100644 index 7c36948..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2Job.java +++ /dev/null @@ -1,280 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.v2; - -import org.apache.hadoop.fs.*; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.io.*; -import org.apache.hadoop.mapred.*; -import org.apache.hadoop.mapred.JobID; -import org.apache.hadoop.mapreduce.*; -import org.apache.hadoop.mapreduce.split.*; -import org.apache.ignite.*; -import org.apache.ignite.internal.processors.hadoop.*; -import org.apache.ignite.internal.processors.hadoop.fs.*; -import org.apache.ignite.internal.processors.hadoop.v1.*; -import org.apache.ignite.internal.util.future.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.jdk8.backport.*; - -import java.io.*; -import java.lang.reflect.*; -import java.util.*; -import java.util.Queue; -import java.util.concurrent.*; - -import static org.apache.ignite.internal.processors.hadoop.GridHadoopUtils.*; - -/** - * Hadoop job implementation for v2 API. - */ -public class GridHadoopV2Job implements GridHadoopJob { - /** */ - private final JobConf jobConf; - - /** */ - private final JobContextImpl jobCtx; - - /** Hadoop job ID. */ - private final GridHadoopJobId jobId; - - /** Job info. */ - protected GridHadoopJobInfo jobInfo; - - /** */ - private final JobID hadoopJobID; - - /** */ - private final GridHadoopV2JobResourceManager rsrcMgr; - - /** */ - private final ConcurrentMap<T2<GridHadoopTaskType, Integer>, GridFutureAdapter<GridHadoopTaskContext>> ctxs = - new ConcurrentHashMap8<>(); - - /** Pooling task context class and thus class loading environment. */ - private final Queue<Class<?>> taskCtxClsPool = new ConcurrentLinkedQueue<>(); - - /** Local node ID */ - private UUID locNodeId; - - /** Serialized JobConf. */ - private volatile byte[] jobConfData; - - /** - * @param jobId Job ID. - * @param jobInfo Job info. - * @param log Logger. - */ - public GridHadoopV2Job(GridHadoopJobId jobId, final GridHadoopDefaultJobInfo jobInfo, IgniteLogger log) { - assert jobId != null; - assert jobInfo != null; - - this.jobId = jobId; - this.jobInfo = jobInfo; - - hadoopJobID = new JobID(jobId.globalId().toString(), jobId.localId()); - - GridHadoopClassLoader clsLdr = (GridHadoopClassLoader)getClass().getClassLoader(); - - // Before create JobConf instance we should set new context class loader. - Thread.currentThread().setContextClassLoader(clsLdr); - - jobConf = new JobConf(); - - GridHadoopFileSystemsUtils.setupFileSystems(jobConf); - - Thread.currentThread().setContextClassLoader(null); - - for (Map.Entry<String,String> e : jobInfo.properties().entrySet()) - jobConf.set(e.getKey(), e.getValue()); - - jobCtx = new JobContextImpl(jobConf, hadoopJobID); - - rsrcMgr = new GridHadoopV2JobResourceManager(jobId, jobCtx, log); - } - - /** {@inheritDoc} */ - @Override public GridHadoopJobId id() { - return jobId; - } - - /** {@inheritDoc} */ - @Override public GridHadoopJobInfo info() { - return jobInfo; - } - - /** {@inheritDoc} */ - @Override public Collection<GridHadoopInputSplit> input() throws IgniteCheckedException { - Thread.currentThread().setContextClassLoader(jobConf.getClassLoader()); - - try { - String jobDirPath = jobConf.get(MRJobConfig.MAPREDUCE_JOB_DIR); - - if (jobDirPath == null) { // Probably job was submitted not by hadoop client. - // Assume that we have needed classes and try to generate input splits ourself. - if (jobConf.getUseNewMapper()) - return GridHadoopV2Splitter.splitJob(jobCtx); - else - return GridHadoopV1Splitter.splitJob(jobConf); - } - - Path jobDir = new Path(jobDirPath); - - try (FileSystem fs = FileSystem.get(jobDir.toUri(), jobConf)) { - JobSplit.TaskSplitMetaInfo[] metaInfos = SplitMetaInfoReader.readSplitMetaInfo(hadoopJobID, fs, jobConf, - jobDir); - - if (F.isEmpty(metaInfos)) - throw new IgniteCheckedException("No input splits found."); - - Path splitsFile = JobSubmissionFiles.getJobSplitFile(jobDir); - - try (FSDataInputStream in = fs.open(splitsFile)) { - Collection<GridHadoopInputSplit> res = new ArrayList<>(metaInfos.length); - - for (JobSplit.TaskSplitMetaInfo metaInfo : metaInfos) { - long off = metaInfo.getStartOffset(); - - String[] hosts = metaInfo.getLocations(); - - in.seek(off); - - String clsName = Text.readString(in); - - GridHadoopFileBlock block = GridHadoopV1Splitter.readFileBlock(clsName, in, hosts); - - if (block == null) - block = GridHadoopV2Splitter.readFileBlock(clsName, in, hosts); - - res.add(block != null ? block : new GridHadoopExternalSplit(hosts, off)); - } - - return res; - } - } - catch (Throwable e) { - throw transformException(e); - } - } - finally { - Thread.currentThread().setContextClassLoader(null); - } - } - - /** {@inheritDoc} */ - @Override public GridHadoopTaskContext getTaskContext(GridHadoopTaskInfo info) throws IgniteCheckedException { - T2<GridHadoopTaskType, Integer> locTaskId = new T2<>(info.type(), info.taskNumber()); - - GridFutureAdapter<GridHadoopTaskContext> fut = ctxs.get(locTaskId); - - if (fut != null) - return fut.get(); - - GridFutureAdapter<GridHadoopTaskContext> old = ctxs.putIfAbsent(locTaskId, fut = new GridFutureAdapter<>()); - - if (old != null) - return old.get(); - - Class<?> cls = taskCtxClsPool.poll(); - - try { - if (cls == null) { - // If there is no pooled class, then load new one. - GridHadoopClassLoader ldr = new GridHadoopClassLoader(rsrcMgr.classPath()); - - cls = ldr.loadClass(GridHadoopV2TaskContext.class.getName()); - } - - Constructor<?> ctr = cls.getConstructor(GridHadoopTaskInfo.class, GridHadoopJob.class, - GridHadoopJobId.class, UUID.class, DataInput.class); - - if (jobConfData == null) - synchronized(jobConf) { - if (jobConfData == null) { - ByteArrayOutputStream buf = new ByteArrayOutputStream(); - - jobConf.write(new DataOutputStream(buf)); - - jobConfData = buf.toByteArray(); - } - } - - GridHadoopTaskContext res = (GridHadoopTaskContext)ctr.newInstance(info, this, jobId, locNodeId, - new DataInputStream(new ByteArrayInputStream(jobConfData))); - - fut.onDone(res); - - return res; - } - catch (Throwable e) { - IgniteCheckedException te = transformException(e); - - fut.onDone(te); - - throw te; - } - } - - /** {@inheritDoc} */ - @Override public void initialize(boolean external, UUID locNodeId) throws IgniteCheckedException { - this.locNodeId = locNodeId; - - Thread.currentThread().setContextClassLoader(jobConf.getClassLoader()); - - try { - rsrcMgr.prepareJobEnvironment(!external, jobLocalDir(locNodeId, jobId)); - } - finally { - Thread.currentThread().setContextClassLoader(null); - } - } - - /** {@inheritDoc} */ - @Override public void dispose(boolean external) throws IgniteCheckedException { - if (rsrcMgr != null && !external) { - File jobLocDir = jobLocalDir(locNodeId, jobId); - - if (jobLocDir.exists()) - U.delete(jobLocDir); - } - } - - /** {@inheritDoc} */ - @Override public void prepareTaskEnvironment(GridHadoopTaskInfo info) throws IgniteCheckedException { - rsrcMgr.prepareTaskWorkDir(taskLocalDir(locNodeId, info)); - } - - /** {@inheritDoc} */ - @Override public void cleanupTaskEnvironment(GridHadoopTaskInfo info) throws IgniteCheckedException { - GridHadoopTaskContext ctx = ctxs.remove(new T2<>(info.type(), info.taskNumber())).get(); - - taskCtxClsPool.offer(ctx.getClass()); - - File locDir = taskLocalDir(locNodeId, info); - - if (locDir.exists()) - U.delete(locDir); - } - - /** {@inheritDoc} */ - @Override public void cleanupStagingDirectory() { - if (rsrcMgr != null) - rsrcMgr.cleanupStagingDirectory(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2JobResourceManager.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2JobResourceManager.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2JobResourceManager.java deleted file mode 100644 index be619c7..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2JobResourceManager.java +++ /dev/null @@ -1,305 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.v2; - -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.*; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapred.*; -import org.apache.hadoop.mapreduce.*; -import org.apache.hadoop.util.*; -import org.apache.ignite.*; -import org.apache.ignite.internal.processors.hadoop.*; -import org.apache.ignite.internal.processors.hadoop.fs.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.net.*; -import java.nio.file.*; -import java.util.*; - -/** - * Provides all resources are needed to the job execution. Downloads the main jar, the configuration and additional - * files are needed to be placed on local files system. - */ -public class GridHadoopV2JobResourceManager { - /** Hadoop job context. */ - private final JobContextImpl ctx; - - /** Logger. */ - private final IgniteLogger log; - - /** Job ID. */ - private final GridHadoopJobId jobId; - - /** Class path list. */ - private URL[] clsPath; - - /** Set of local resources. */ - private final Collection<File> rsrcSet = new HashSet<>(); - - /** Staging directory to delivery job jar and config to the work nodes. */ - private Path stagingDir; - - /** - * Creates new instance. - * @param jobId Job ID. - * @param ctx Hadoop job context. - * @param log Logger. - */ - public GridHadoopV2JobResourceManager(GridHadoopJobId jobId, JobContextImpl ctx, IgniteLogger log) { - this.jobId = jobId; - this.ctx = ctx; - this.log = log.getLogger(GridHadoopV2JobResourceManager.class); - } - - /** - * Set working directory in local file system. - * - * @param dir Working directory. - * @throws IOException If fails. - */ - private void setLocalFSWorkingDirectory(File dir) throws IOException { - JobConf cfg = ctx.getJobConf(); - - Thread.currentThread().setContextClassLoader(cfg.getClassLoader()); - - try { - cfg.set(GridHadoopFileSystemsUtils.LOC_FS_WORK_DIR_PROP, dir.getAbsolutePath()); - - if(!cfg.getBoolean("fs.file.impl.disable.cache", false)) - FileSystem.getLocal(cfg).setWorkingDirectory(new Path(dir.getAbsolutePath())); - } - finally { - Thread.currentThread().setContextClassLoader(null); - } - } - - /** - * Prepare job resources. Resolve the classpath list and download it if needed. - * - * @param download {@code true} If need to download resources. - * @param jobLocDir Work directory for the job. - * @throws IgniteCheckedException If failed. - */ - public void prepareJobEnvironment(boolean download, File jobLocDir) throws IgniteCheckedException { - try { - if (jobLocDir.exists()) - throw new IgniteCheckedException("Local job directory already exists: " + jobLocDir.getAbsolutePath()); - - JobConf cfg = ctx.getJobConf(); - - String mrDir = cfg.get("mapreduce.job.dir"); - - if (mrDir != null) { - stagingDir = new Path(new URI(mrDir)); - - if (download) { - FileSystem fs = FileSystem.get(stagingDir.toUri(), cfg); - - if (!fs.exists(stagingDir)) - throw new IgniteCheckedException("Failed to find map-reduce submission directory (does not exist): " + - stagingDir); - - if (!FileUtil.copy(fs, stagingDir, jobLocDir, false, cfg)) - throw new IgniteCheckedException("Failed to copy job submission directory contents to local file system " + - "[path=" + stagingDir + ", locDir=" + jobLocDir.getAbsolutePath() + ", jobId=" + jobId + ']'); - } - - File jarJobFile = new File(jobLocDir, "job.jar"); - - Collection<URL> clsPathUrls = new ArrayList<>(); - - clsPathUrls.add(jarJobFile.toURI().toURL()); - - rsrcSet.add(jarJobFile); - rsrcSet.add(new File(jobLocDir, "job.xml")); - - processFiles(jobLocDir, ctx.getCacheFiles(), download, false, null, MRJobConfig.CACHE_LOCALFILES); - processFiles(jobLocDir, ctx.getCacheArchives(), download, true, null, MRJobConfig.CACHE_LOCALARCHIVES); - processFiles(jobLocDir, ctx.getFileClassPaths(), download, false, clsPathUrls, null); - processFiles(jobLocDir, ctx.getArchiveClassPaths(), download, true, clsPathUrls, null); - - if (!clsPathUrls.isEmpty()) { - clsPath = new URL[clsPathUrls.size()]; - - clsPathUrls.toArray(clsPath); - } - } - else if (!jobLocDir.mkdirs()) - throw new IgniteCheckedException("Failed to create local job directory: " + jobLocDir.getAbsolutePath()); - - setLocalFSWorkingDirectory(jobLocDir); - } - catch (URISyntaxException | IOException e) { - throw new IgniteCheckedException(e); - } - } - - /** - * Process list of resources. - * - * @param jobLocDir Job working directory. - * @param files Array of {@link java.net.URI} or {@link org.apache.hadoop.fs.Path} to process resources. - * @param download {@code true}, if need to download. Process class path only else. - * @param extract {@code true}, if need to extract archive. - * @param clsPathUrls Collection to add resource as classpath resource. - * @param rsrcNameProp Property for resource name array setting. - * @throws IOException If failed. - */ - private void processFiles(File jobLocDir, @Nullable Object[] files, boolean download, boolean extract, - @Nullable Collection<URL> clsPathUrls, @Nullable String rsrcNameProp) throws IOException { - if (F.isEmptyOrNulls(files)) - return; - - Collection<String> res = new ArrayList<>(); - - for (Object pathObj : files) { - String locName = null; - Path srcPath; - - if (pathObj instanceof URI) { - URI uri = (URI)pathObj; - - locName = uri.getFragment(); - - srcPath = new Path(uri); - } - else - srcPath = (Path)pathObj; - - if (locName == null) - locName = srcPath.getName(); - - File dstPath = new File(jobLocDir.getAbsolutePath(), locName); - - res.add(locName); - - rsrcSet.add(dstPath); - - if (clsPathUrls != null) - clsPathUrls.add(dstPath.toURI().toURL()); - - if (!download) - continue; - - JobConf cfg = ctx.getJobConf(); - - FileSystem dstFs = FileSystem.getLocal(cfg); - - FileSystem srcFs = srcPath.getFileSystem(cfg); - - if (extract) { - File archivesPath = new File(jobLocDir.getAbsolutePath(), ".cached-archives"); - - if (!archivesPath.exists() && !archivesPath.mkdir()) - throw new IOException("Failed to create directory " + - "[path=" + archivesPath + ", jobId=" + jobId + ']'); - - File archiveFile = new File(archivesPath, locName); - - FileUtil.copy(srcFs, srcPath, dstFs, new Path(archiveFile.toString()), false, cfg); - - String archiveNameLC = archiveFile.getName().toLowerCase(); - - if (archiveNameLC.endsWith(".jar")) - RunJar.unJar(archiveFile, dstPath); - else if (archiveNameLC.endsWith(".zip")) - FileUtil.unZip(archiveFile, dstPath); - else if (archiveNameLC.endsWith(".tar.gz") || - archiveNameLC.endsWith(".tgz") || - archiveNameLC.endsWith(".tar")) - FileUtil.unTar(archiveFile, dstPath); - else - throw new IOException("Cannot unpack archive [path=" + srcPath + ", jobId=" + jobId + ']'); - } - else - FileUtil.copy(srcFs, srcPath, dstFs, new Path(dstPath.toString()), false, cfg); - } - - if (!res.isEmpty() && rsrcNameProp != null) - ctx.getJobConf().setStrings(rsrcNameProp, res.toArray(new String[res.size()])); - } - - /** - * Prepares working directory for the task. - * - * <ul> - * <li>Creates working directory.</li> - * <li>Creates symbolic links to all job resources in working directory.</li> - * </ul> - * - * @param path Path to working directory of the task. - * @throws IgniteCheckedException If fails. - */ - public void prepareTaskWorkDir(File path) throws IgniteCheckedException { - try { - if (path.exists()) - throw new IOException("Task local directory already exists: " + path); - - if (!path.mkdir()) - throw new IOException("Failed to create directory: " + path); - - for (File resource : rsrcSet) { - File symLink = new File(path, resource.getName()); - - try { - Files.createSymbolicLink(symLink.toPath(), resource.toPath()); - } - catch (IOException e) { - String msg = "Unable to create symlink \"" + symLink + "\" to \"" + resource + "\"."; - - if (U.isWindows() && e instanceof FileSystemException) - msg += "\n\nAbility to create symbolic links is required!\n" + - "On Windows platform you have to grant permission 'Create symbolic links'\n" + - "to your user or run the Accelerator as Administrator.\n"; - - throw new IOException(msg, e); - } - } - } - catch (IOException e) { - throw new IgniteCheckedException("Unable to prepare local working directory for the task " + - "[jobId=" + jobId + ", path=" + path+ ']', e); - } - } - - /** - * Cleans up job staging directory. - */ - public void cleanupStagingDirectory() { - try { - if (stagingDir != null) - stagingDir.getFileSystem(ctx.getJobConf()).delete(stagingDir, true); - } - catch (Exception e) { - log.error("Failed to remove job staging directory [path=" + stagingDir + ", jobId=" + jobId + ']' , e); - } - } - - /** - * Returns array of class path for current job. - * - * @return Class path collection. - */ - @Nullable public URL[] classPath() { - return clsPath; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2MapTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2MapTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2MapTask.java deleted file mode 100644 index be0bea2..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2MapTask.java +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.v2; - -import org.apache.hadoop.fs.*; -import org.apache.hadoop.mapred.*; -import org.apache.hadoop.mapreduce.InputFormat; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.Mapper; -import org.apache.hadoop.mapreduce.OutputFormat; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.lib.input.FileSplit; -import org.apache.hadoop.mapreduce.lib.map.*; -import org.apache.hadoop.util.*; -import org.apache.ignite.*; -import org.apache.ignite.internal.processors.hadoop.*; -import org.apache.ignite.internal.*; - -/** - * Hadoop map task implementation for v2 API. - */ -public class GridHadoopV2MapTask extends GridHadoopV2Task { - /** - * @param taskInfo Task info. - */ - public GridHadoopV2MapTask(GridHadoopTaskInfo taskInfo) { - super(taskInfo); - } - - /** {@inheritDoc} */ - @SuppressWarnings({"ConstantConditions", "unchecked"}) - @Override public void run0(GridHadoopV2TaskContext taskCtx) throws IgniteCheckedException { - GridHadoopInputSplit split = info().inputSplit(); - - InputSplit nativeSplit; - - if (split instanceof GridHadoopFileBlock) { - GridHadoopFileBlock block = (GridHadoopFileBlock)split; - - nativeSplit = new FileSplit(new Path(block.file().toString()), block.start(), block.length(), null); - } - else - nativeSplit = (InputSplit)taskCtx.getNativeSplit(split); - - assert nativeSplit != null; - - OutputFormat outputFormat = null; - Exception err = null; - - JobContextImpl jobCtx = taskCtx.jobContext(); - - try { - InputFormat inFormat = ReflectionUtils.newInstance(jobCtx.getInputFormatClass(), - hadoopContext().getConfiguration()); - - RecordReader reader = inFormat.createRecordReader(nativeSplit, hadoopContext()); - - reader.initialize(nativeSplit, hadoopContext()); - - hadoopContext().reader(reader); - - GridHadoopJobInfo jobInfo = taskCtx.job().info(); - - outputFormat = jobInfo.hasCombiner() || jobInfo.hasReducer() ? null : prepareWriter(jobCtx); - - Mapper mapper = ReflectionUtils.newInstance(jobCtx.getMapperClass(), hadoopContext().getConfiguration()); - - try { - mapper.run(new WrappedMapper().getMapContext(hadoopContext())); - } - finally { - closeWriter(); - } - - commit(outputFormat); - } - catch (InterruptedException e) { - err = e; - - Thread.currentThread().interrupt(); - - throw new IgniteInterruptedCheckedException(e); - } - catch (Exception e) { - err = e; - - throw new IgniteCheckedException(e); - } - finally { - if (err != null) - abort(outputFormat); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2Partitioner.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2Partitioner.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2Partitioner.java deleted file mode 100644 index 0883520..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2Partitioner.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.v2; - -import org.apache.hadoop.conf.*; -import org.apache.hadoop.mapreduce.*; -import org.apache.hadoop.util.*; -import org.apache.ignite.internal.processors.hadoop.*; - -/** - * Hadoop partitioner adapter for v2 API. - */ -public class GridHadoopV2Partitioner implements GridHadoopPartitioner { - /** Partitioner instance. */ - private Partitioner<Object, Object> part; - - /** - * @param cls Hadoop partitioner class. - * @param conf Job configuration. - */ - public GridHadoopV2Partitioner(Class<? extends Partitioner<?, ?>> cls, Configuration conf) { - part = (Partitioner<Object, Object>) ReflectionUtils.newInstance(cls, conf); - } - - /** {@inheritDoc} */ - @Override public int partition(Object key, Object val, int parts) { - return part.getPartition(key, val, parts); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2ReduceTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2ReduceTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2ReduceTask.java deleted file mode 100644 index 146e05c..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2ReduceTask.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.v2; - -import org.apache.hadoop.mapred.*; -import org.apache.hadoop.mapreduce.OutputFormat; -import org.apache.hadoop.mapreduce.Reducer; -import org.apache.hadoop.mapreduce.lib.reduce.*; -import org.apache.hadoop.util.*; -import org.apache.ignite.*; -import org.apache.ignite.internal.processors.hadoop.*; -import org.apache.ignite.internal.*; - -/** - * Hadoop reduce task implementation for v2 API. - */ -public class GridHadoopV2ReduceTask extends GridHadoopV2Task { - /** {@code True} if reduce, {@code false} if combine. */ - private final boolean reduce; - - /** - * Constructor. - * - * @param taskInfo Task info. - * @param reduce {@code True} if reduce, {@code false} if combine. - */ - public GridHadoopV2ReduceTask(GridHadoopTaskInfo taskInfo, boolean reduce) { - super(taskInfo); - - this.reduce = reduce; - } - - /** {@inheritDoc} */ - @SuppressWarnings({"ConstantConditions", "unchecked"}) - @Override public void run0(GridHadoopV2TaskContext taskCtx) throws IgniteCheckedException { - OutputFormat outputFormat = null; - Exception err = null; - - JobContextImpl jobCtx = taskCtx.jobContext(); - - try { - outputFormat = reduce || !taskCtx.job().info().hasReducer() ? prepareWriter(jobCtx) : null; - - Reducer reducer = ReflectionUtils.newInstance(reduce ? jobCtx.getReducerClass() : jobCtx.getCombinerClass(), - jobCtx.getConfiguration()); - - try { - reducer.run(new WrappedReducer().getReducerContext(hadoopContext())); - } - finally { - closeWriter(); - } - - commit(outputFormat); - } - catch (InterruptedException e) { - err = e; - - Thread.currentThread().interrupt(); - - throw new IgniteInterruptedCheckedException(e); - } - catch (Exception e) { - err = e; - - throw new IgniteCheckedException(e); - } - finally { - if (err != null) - abort(outputFormat); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2SetupTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2SetupTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2SetupTask.java deleted file mode 100644 index 54eda25..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2SetupTask.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.v2; - -import org.apache.hadoop.mapred.*; -import org.apache.hadoop.mapreduce.OutputCommitter; -import org.apache.hadoop.mapreduce.OutputFormat; -import org.apache.ignite.*; -import org.apache.ignite.internal.processors.hadoop.*; -import org.apache.ignite.internal.*; - -import java.io.*; - -/** - * Hadoop setup task (prepares job). - */ -public class GridHadoopV2SetupTask extends GridHadoopV2Task { - /** - * Constructor. - * - * @param taskInfo task info. - */ - public GridHadoopV2SetupTask(GridHadoopTaskInfo taskInfo) { - super(taskInfo); - } - - /** {@inheritDoc} */ - @SuppressWarnings("ConstantConditions") - @Override protected void run0(GridHadoopV2TaskContext taskCtx) throws IgniteCheckedException { - try { - JobContextImpl jobCtx = taskCtx.jobContext(); - - OutputFormat outputFormat = getOutputFormat(jobCtx); - - outputFormat.checkOutputSpecs(jobCtx); - - OutputCommitter committer = outputFormat.getOutputCommitter(hadoopContext()); - - if (committer != null) - committer.setupJob(jobCtx); - } - catch (ClassNotFoundException | IOException e) { - throw new IgniteCheckedException(e); - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - - throw new IgniteInterruptedCheckedException(e); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2Splitter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2Splitter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2Splitter.java deleted file mode 100644 index e8ce70b..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2Splitter.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.v2; - -import org.apache.hadoop.mapreduce.*; -import org.apache.hadoop.mapreduce.lib.input.*; -import org.apache.hadoop.util.*; -import org.apache.ignite.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.processors.hadoop.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.util.*; - -/** - * Hadoop API v2 splitter. - */ -public class GridHadoopV2Splitter { - /** */ - private static final String[] EMPTY_HOSTS = {}; - - /** - * @param ctx Job context. - * @return Collection of mapped splits. - * @throws IgniteCheckedException If mapping failed. - */ - public static Collection<GridHadoopInputSplit> splitJob(JobContext ctx) throws IgniteCheckedException { - try { - InputFormat<?, ?> format = ReflectionUtils.newInstance(ctx.getInputFormatClass(), ctx.getConfiguration()); - - assert format != null; - - List<InputSplit> splits = format.getSplits(ctx); - - Collection<GridHadoopInputSplit> res = new ArrayList<>(splits.size()); - - int id = 0; - - for (InputSplit nativeSplit : splits) { - if (nativeSplit instanceof FileSplit) { - FileSplit s = (FileSplit)nativeSplit; - - res.add(new GridHadoopFileBlock(s.getLocations(), s.getPath().toUri(), s.getStart(), s.getLength())); - } - else - res.add(GridHadoopUtils.wrapSplit(id, nativeSplit, nativeSplit.getLocations())); - - id++; - } - - return res; - } - catch (IOException | ClassNotFoundException e) { - throw new IgniteCheckedException(e); - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - - throw new IgniteInterruptedCheckedException(e); - } - } - - /** - * @param clsName Input split class name. - * @param in Input stream. - * @param hosts Optional hosts. - * @return File block or {@code null} if it is not a {@link FileSplit} instance. - * @throws IgniteCheckedException If failed. - */ - public static GridHadoopFileBlock readFileBlock(String clsName, DataInput in, @Nullable String[] hosts) - throws IgniteCheckedException { - if (!FileSplit.class.getName().equals(clsName)) - return null; - - FileSplit split = new FileSplit(); - - try { - split.readFields(in); - } - catch (IOException e) { - throw new IgniteCheckedException(e); - } - - if (hosts == null) - hosts = EMPTY_HOSTS; - - return new GridHadoopFileBlock(hosts, split.getPath().toUri(), split.getStart(), split.getLength()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2Task.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2Task.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2Task.java deleted file mode 100644 index 37697c6..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2Task.java +++ /dev/null @@ -1,181 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.v2; - -import org.apache.hadoop.mapreduce.*; -import org.apache.hadoop.util.*; -import org.apache.ignite.*; -import org.apache.ignite.internal.processors.hadoop.*; -import org.jetbrains.annotations.*; - -import java.io.*; - -/** - * Extended Hadoop v2 task. - */ -public abstract class GridHadoopV2Task extends GridHadoopTask { - /** Hadoop context. */ - private GridHadoopV2Context hadoopCtx; - - /** - * Constructor. - * - * @param taskInfo Task info. - */ - protected GridHadoopV2Task(GridHadoopTaskInfo taskInfo) { - super(taskInfo); - } - - /** {@inheritDoc} */ - @Override public void run(GridHadoopTaskContext taskCtx) throws IgniteCheckedException { - GridHadoopV2TaskContext ctx = (GridHadoopV2TaskContext)taskCtx; - - hadoopCtx = new GridHadoopV2Context(ctx); - - run0(ctx); - } - - /** - * Internal task routine. - * - * @param taskCtx Task context. - * @throws IgniteCheckedException - */ - protected abstract void run0(GridHadoopV2TaskContext taskCtx) throws IgniteCheckedException; - - /** - * @return hadoop context. - */ - protected GridHadoopV2Context hadoopContext() { - return hadoopCtx; - } - - /** - * Create and configure an OutputFormat instance. - * - * @param jobCtx Job context. - * @return Instance of OutputFormat is specified in job configuration. - * @throws ClassNotFoundException If specified class not found. - */ - protected OutputFormat getOutputFormat(JobContext jobCtx) throws ClassNotFoundException { - return ReflectionUtils.newInstance(jobCtx.getOutputFormatClass(), hadoopContext().getConfiguration()); - } - - /** - * Put write into Hadoop context and return associated output format instance. - * - * @param jobCtx Job context. - * @return Output format. - * @throws IgniteCheckedException In case of Grid exception. - * @throws InterruptedException In case of interrupt. - */ - protected OutputFormat prepareWriter(JobContext jobCtx) - throws IgniteCheckedException, InterruptedException { - try { - OutputFormat outputFormat = getOutputFormat(jobCtx); - - assert outputFormat != null; - - OutputCommitter outCommitter = outputFormat.getOutputCommitter(hadoopCtx); - - if (outCommitter != null) - outCommitter.setupTask(hadoopCtx); - - RecordWriter writer = outputFormat.getRecordWriter(hadoopCtx); - - hadoopCtx.writer(writer); - - return outputFormat; - } - catch (IOException | ClassNotFoundException e) { - throw new IgniteCheckedException(e); - } - } - - /** - * Closes writer. - * - * @throws Exception If fails and logger hasn't been specified. - */ - protected void closeWriter() throws Exception { - RecordWriter writer = hadoopCtx.writer(); - - if (writer != null) - writer.close(hadoopCtx); - } - - /** - * Setup task. - * - * @param outputFormat Output format. - * @throws IOException In case of IO exception. - * @throws InterruptedException In case of interrupt. - */ - protected void setup(@Nullable OutputFormat outputFormat) throws IOException, InterruptedException { - if (hadoopCtx.writer() != null) { - assert outputFormat != null; - - outputFormat.getOutputCommitter(hadoopCtx).setupTask(hadoopCtx); - } - } - - /** - * Commit task. - * - * @param outputFormat Output format. - * @throws IgniteCheckedException In case of Grid exception. - * @throws IOException In case of IO exception. - * @throws InterruptedException In case of interrupt. - */ - protected void commit(@Nullable OutputFormat outputFormat) throws IgniteCheckedException, IOException, InterruptedException { - if (hadoopCtx.writer() != null) { - assert outputFormat != null; - - OutputCommitter outputCommitter = outputFormat.getOutputCommitter(hadoopCtx); - - if (outputCommitter.needsTaskCommit(hadoopCtx)) - outputCommitter.commitTask(hadoopCtx); - } - } - - /** - * Abort task. - * - * @param outputFormat Output format. - */ - protected void abort(@Nullable OutputFormat outputFormat) { - if (hadoopCtx.writer() != null) { - assert outputFormat != null; - - try { - outputFormat.getOutputCommitter(hadoopCtx).abortTask(hadoopCtx); - } - catch (IOException ignore) { - // Ignore. - } - catch (InterruptedException ignore) { - Thread.currentThread().interrupt(); - } - } - } - - /** {@inheritDoc} */ - @Override public void cancel() { - hadoopCtx.cancel(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2TaskContext.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2TaskContext.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2TaskContext.java deleted file mode 100644 index 41bd24a..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2TaskContext.java +++ /dev/null @@ -1,443 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.v2; - -import org.apache.hadoop.conf.*; -import org.apache.hadoop.fs.*; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.io.*; -import org.apache.hadoop.io.serializer.*; -import org.apache.hadoop.mapred.*; -import org.apache.hadoop.mapred.JobID; -import org.apache.hadoop.mapred.TaskAttemptID; -import org.apache.hadoop.mapred.TaskID; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.*; -import org.apache.ignite.*; -import org.apache.ignite.internal.processors.hadoop.*; -import org.apache.ignite.internal.processors.hadoop.counter.*; -import org.apache.ignite.internal.processors.hadoop.fs.*; -import org.apache.ignite.internal.processors.hadoop.v1.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.util.*; - -import static org.apache.ignite.igfs.hadoop.IgfsHadoopParameters.*; -import static org.apache.ignite.internal.processors.hadoop.GridHadoopUtils.*; - -/** - * Context for task execution. - */ -public class GridHadoopV2TaskContext extends GridHadoopTaskContext { - /** */ - private static final boolean COMBINE_KEY_GROUPING_SUPPORTED; - - /** - * Check for combiner grouping support (available since Hadoop 2.3). - */ - static { - boolean ok; - - try { - JobContext.class.getDeclaredMethod("getCombinerKeyGroupingComparator"); - - ok = true; - } - catch (NoSuchMethodException ignore) { - ok = false; - } - - COMBINE_KEY_GROUPING_SUPPORTED = ok; - } - - /** Flag is set if new context-object code is used for running the mapper. */ - private final boolean useNewMapper; - - /** Flag is set if new context-object code is used for running the reducer. */ - private final boolean useNewReducer; - - /** Flag is set if new context-object code is used for running the combiner. */ - private final boolean useNewCombiner; - - /** */ - private final JobContextImpl jobCtx; - - /** Set if task is to cancelling. */ - private volatile boolean cancelled; - - /** Current task. */ - private volatile GridHadoopTask task; - - /** Local node ID */ - private UUID locNodeId; - - /** Counters for task. */ - private final GridHadoopCounters cntrs = new GridHadoopCountersImpl(); - - /** - * @param taskInfo Task info. - * @param job Job. - * @param jobId Job ID. - * @param locNodeId Local node ID. - * @param jobConfDataInput DataInput for read JobConf. - */ - public GridHadoopV2TaskContext(GridHadoopTaskInfo taskInfo, GridHadoopJob job, GridHadoopJobId jobId, - @Nullable UUID locNodeId, DataInput jobConfDataInput) throws IgniteCheckedException { - super(taskInfo, job); - this.locNodeId = locNodeId; - - // Before create JobConf instance we should set new context class loader. - Thread.currentThread().setContextClassLoader(getClass().getClassLoader()); - - try { - JobConf jobConf = new JobConf(); - - try { - jobConf.readFields(jobConfDataInput); - } - catch (IOException e) { - throw new IgniteCheckedException(e); - } - - // For map-reduce jobs prefer local writes. - jobConf.setBooleanIfUnset(PARAM_IGFS_PREFER_LOCAL_WRITES, true); - - jobCtx = new JobContextImpl(jobConf, new JobID(jobId.globalId().toString(), jobId.localId())); - - useNewMapper = jobConf.getUseNewMapper(); - useNewReducer = jobConf.getUseNewReducer(); - useNewCombiner = jobConf.getCombinerClass() == null; - } - finally { - Thread.currentThread().setContextClassLoader(null); - } - } - - /** {@inheritDoc} */ - @Override public <T extends GridHadoopCounter> T counter(String grp, String name, Class<T> cls) { - return cntrs.counter(grp, name, cls); - } - - /** {@inheritDoc} */ - @Override public GridHadoopCounters counters() { - return cntrs; - } - - /** - * Creates appropriate task from current task info. - * - * @return Task. - */ - private GridHadoopTask createTask() { - boolean isAbort = taskInfo().type() == GridHadoopTaskType.ABORT; - - switch (taskInfo().type()) { - case SETUP: - return useNewMapper ? new GridHadoopV2SetupTask(taskInfo()) : new GridHadoopV1SetupTask(taskInfo()); - - case MAP: - return useNewMapper ? new GridHadoopV2MapTask(taskInfo()) : new GridHadoopV1MapTask(taskInfo()); - - case REDUCE: - return useNewReducer ? new GridHadoopV2ReduceTask(taskInfo(), true) : - new GridHadoopV1ReduceTask(taskInfo(), true); - - case COMBINE: - return useNewCombiner ? new GridHadoopV2ReduceTask(taskInfo(), false) : - new GridHadoopV1ReduceTask(taskInfo(), false); - - case COMMIT: - case ABORT: - return useNewReducer ? new GridHadoopV2CleanupTask(taskInfo(), isAbort) : - new GridHadoopV1CleanupTask(taskInfo(), isAbort); - - default: - return null; - } - } - - /** {@inheritDoc} */ - @Override public void run() throws IgniteCheckedException { - try { - Thread.currentThread().setContextClassLoader(jobConf().getClassLoader()); - - try { - task = createTask(); - } - catch (Throwable e) { - throw transformException(e); - } - - if (cancelled) - throw new GridHadoopTaskCancelledException("Task cancelled."); - - try { - task.run(this); - } - catch (Throwable e) { - throw transformException(e); - } - } - finally { - task = null; - - Thread.currentThread().setContextClassLoader(null); - } - } - - /** {@inheritDoc} */ - @Override public void cancel() { - cancelled = true; - - GridHadoopTask t = task; - - if (t != null) - t.cancel(); - } - - /** {@inheritDoc} */ - @Override public void prepareTaskEnvironment() throws IgniteCheckedException { - File locDir; - - switch(taskInfo().type()) { - case MAP: - case REDUCE: - job().prepareTaskEnvironment(taskInfo()); - - locDir = taskLocalDir(locNodeId, taskInfo()); - - break; - - default: - locDir = jobLocalDir(locNodeId, taskInfo().jobId()); - } - - Thread.currentThread().setContextClassLoader(jobConf().getClassLoader()); - - try { - FileSystem fs = FileSystem.get(jobConf()); - - GridHadoopFileSystemsUtils.setUser(fs, jobConf().getUser()); - - LocalFileSystem locFs = FileSystem.getLocal(jobConf()); - - locFs.setWorkingDirectory(new Path(locDir.getAbsolutePath())); - } - catch (Throwable e) { - throw transformException(e); - } - finally { - Thread.currentThread().setContextClassLoader(null); - } - } - - /** {@inheritDoc} */ - @Override public void cleanupTaskEnvironment() throws IgniteCheckedException { - job().cleanupTaskEnvironment(taskInfo()); - } - - /** - * Creates Hadoop attempt ID. - * - * @return Attempt ID. - */ - public TaskAttemptID attemptId() { - TaskID tid = new TaskID(jobCtx.getJobID(), taskType(taskInfo().type()), taskInfo().taskNumber()); - - return new TaskAttemptID(tid, taskInfo().attempt()); - } - - /** - * @param type Task type. - * @return Hadoop task type. - */ - private TaskType taskType(GridHadoopTaskType type) { - switch (type) { - case SETUP: - return TaskType.JOB_SETUP; - case MAP: - case COMBINE: - return TaskType.MAP; - - case REDUCE: - return TaskType.REDUCE; - - case COMMIT: - case ABORT: - return TaskType.JOB_CLEANUP; - - default: - return null; - } - } - - /** - * Gets job configuration of the task. - * - * @return Job configuration. - */ - public JobConf jobConf() { - return jobCtx.getJobConf(); - } - - /** - * Gets job context of the task. - * - * @return Job context. - */ - public JobContextImpl jobContext() { - return jobCtx; - } - - /** {@inheritDoc} */ - @Override public GridHadoopPartitioner partitioner() throws IgniteCheckedException { - Class<?> partClsOld = jobConf().getClass("mapred.partitioner.class", null); - - if (partClsOld != null) - return new GridHadoopV1Partitioner(jobConf().getPartitionerClass(), jobConf()); - - try { - return new GridHadoopV2Partitioner(jobCtx.getPartitionerClass(), jobConf()); - } - catch (ClassNotFoundException e) { - throw new IgniteCheckedException(e); - } - } - - /** - * Gets serializer for specified class. - * - * @param cls Class. - * @param jobConf Job configuration. - * @return Appropriate serializer. - */ - @SuppressWarnings("unchecked") - private GridHadoopSerialization getSerialization(Class<?> cls, Configuration jobConf) throws IgniteCheckedException { - A.notNull(cls, "cls"); - - SerializationFactory factory = new SerializationFactory(jobConf); - - Serialization<?> serialization = factory.getSerialization(cls); - - if (serialization == null) - throw new IgniteCheckedException("Failed to find serialization for: " + cls.getName()); - - if (serialization.getClass() == WritableSerialization.class) - return new GridHadoopWritableSerialization((Class<? extends Writable>)cls); - - return new GridHadoopSerializationWrapper(serialization, cls); - } - - /** {@inheritDoc} */ - @Override public GridHadoopSerialization keySerialization() throws IgniteCheckedException { - return getSerialization(jobCtx.getMapOutputKeyClass(), jobConf()); - } - - /** {@inheritDoc} */ - @Override public GridHadoopSerialization valueSerialization() throws IgniteCheckedException { - return getSerialization(jobCtx.getMapOutputValueClass(), jobConf()); - } - - /** {@inheritDoc} */ - @Override public Comparator<Object> sortComparator() { - return (Comparator<Object>)jobCtx.getSortComparator(); - } - - /** {@inheritDoc} */ - @Override public Comparator<Object> groupComparator() { - Comparator<?> res; - - switch (taskInfo().type()) { - case COMBINE: - res = COMBINE_KEY_GROUPING_SUPPORTED ? - jobContext().getCombinerKeyGroupingComparator() : jobContext().getGroupingComparator(); - - break; - - case REDUCE: - res = jobContext().getGroupingComparator(); - - break; - - default: - return null; - } - - if (res != null && res.getClass() != sortComparator().getClass()) - return (Comparator<Object>)res; - - return null; - } - - /** - * @param split Split. - * @return Native Hadoop split. - * @throws IgniteCheckedException if failed. - */ - @SuppressWarnings("unchecked") - public Object getNativeSplit(GridHadoopInputSplit split) throws IgniteCheckedException { - if (split instanceof GridHadoopExternalSplit) - return readExternalSplit((GridHadoopExternalSplit)split); - - if (split instanceof GridHadoopSplitWrapper) - return unwrapSplit((GridHadoopSplitWrapper)split); - - throw new IllegalStateException("Unknown split: " + split); - } - - /** - * @param split External split. - * @return Native input split. - * @throws IgniteCheckedException If failed. - */ - @SuppressWarnings("unchecked") - private Object readExternalSplit(GridHadoopExternalSplit split) throws IgniteCheckedException { - Path jobDir = new Path(jobConf().get(MRJobConfig.MAPREDUCE_JOB_DIR)); - - try (FileSystem fs = FileSystem.get(jobDir.toUri(), jobConf()); - FSDataInputStream in = fs.open(JobSubmissionFiles.getJobSplitFile(jobDir))) { - - in.seek(split.offset()); - - String clsName = Text.readString(in); - - Class<?> cls = jobConf().getClassByName(clsName); - - assert cls != null; - - Serialization serialization = new SerializationFactory(jobConf()).getSerialization(cls); - - Deserializer deserializer = serialization.getDeserializer(cls); - - deserializer.open(in); - - Object res = deserializer.deserialize(null); - - deserializer.close(); - - assert res != null; - - return res; - } - catch (IOException | ClassNotFoundException e) { - throw new IgniteCheckedException(e); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopWritableSerialization.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopWritableSerialization.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopWritableSerialization.java deleted file mode 100644 index 4361ad4..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopWritableSerialization.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.v2; - -import org.apache.hadoop.io.*; -import org.apache.ignite.*; -import org.apache.ignite.internal.processors.hadoop.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.jetbrains.annotations.*; - -import java.io.*; - -/** - * Optimized serialization for Hadoop {@link Writable} types. - */ -public class GridHadoopWritableSerialization implements GridHadoopSerialization { - /** */ - private final Class<? extends Writable> cls; - - /** - * @param cls Class. - */ - public GridHadoopWritableSerialization(Class<? extends Writable> cls) { - assert cls != null; - - this.cls = cls; - } - - /** {@inheritDoc} */ - @Override public void write(DataOutput out, Object obj) throws IgniteCheckedException { - assert cls.isAssignableFrom(obj.getClass()) : cls + " " + obj.getClass(); - - try { - ((Writable)obj).write(out); - } - catch (IOException e) { - throw new IgniteCheckedException(e); - } - } - - /** {@inheritDoc} */ - @Override public Object read(DataInput in, @Nullable Object obj) throws IgniteCheckedException { - Writable w = obj == null ? U.newInstance(cls) : cls.cast(obj); - - try { - w.readFields(in); - } - catch (IOException e) { - throw new IgniteCheckedException(e); - } - - return w; - } - - /** {@inheritDoc} */ - @Override public void close() { - // No-op. - } -}