http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1OutputCollector.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1OutputCollector.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1OutputCollector.java deleted file mode 100644 index 2a38684..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1OutputCollector.java +++ /dev/null @@ -1,130 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.v1; - -import org.apache.hadoop.mapred.*; -import org.apache.ignite.*; -import org.apache.ignite.internal.processors.hadoop.*; -import org.jetbrains.annotations.*; - -import java.io.*; - -/** - * Hadoop output collector. - */ -public class GridHadoopV1OutputCollector implements OutputCollector { - /** Job configuration. */ - private final JobConf jobConf; - - /** Task context. */ - private final GridHadoopTaskContext taskCtx; - - /** Optional direct writer. */ - private final RecordWriter writer; - - /** Task attempt. */ - private final TaskAttemptID attempt; - - /** - * @param jobConf Job configuration. - * @param taskCtx Task context. - * @param directWrite Direct write flag. - * @param fileName File name. - * @throws IOException In case of IO exception. - */ - GridHadoopV1OutputCollector(JobConf jobConf, GridHadoopTaskContext taskCtx, boolean directWrite, - @Nullable String fileName, TaskAttemptID attempt) throws IOException { - this.jobConf = jobConf; - this.taskCtx = taskCtx; - this.attempt = attempt; - - if (directWrite) { - jobConf.set("mapreduce.task.attempt.id", attempt.toString()); - - OutputFormat outFormat = jobConf.getOutputFormat(); - - writer = outFormat.getRecordWriter(null, jobConf, fileName, Reporter.NULL); - } - else - writer = null; - } - - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override public void collect(Object key, Object val) throws IOException { - if (writer != null) - writer.write(key, val); - else { - try { - taskCtx.output().write(key, val); - } - catch (IgniteCheckedException e) { - throw new IOException(e); - } - } - } - - /** - * Close writer. - * - * @throws IOException In case of IO exception. - */ - public void closeWriter() throws IOException { - if (writer != null) - writer.close(Reporter.NULL); - } - - /** - * Setup task. - * - * @throws IOException If failed. - */ - public void setup() throws IOException { - if (writer != null) - jobConf.getOutputCommitter().setupTask(new TaskAttemptContextImpl(jobConf, attempt)); - } - - /** - * Commit task. - * - * @throws IOException In failed. - */ - public void commit() throws IOException { - if (writer != null) { - OutputCommitter outputCommitter = jobConf.getOutputCommitter(); - - TaskAttemptContext taskCtx = new TaskAttemptContextImpl(jobConf, attempt); - - if (outputCommitter.needsTaskCommit(taskCtx)) - outputCommitter.commitTask(taskCtx); - } - } - - /** - * Abort task. - */ - public void abort() { - try { - if (writer != null) - jobConf.getOutputCommitter().abortTask(new TaskAttemptContextImpl(jobConf, attempt)); - } - catch (IOException ignore) { - // No-op. - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Partitioner.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Partitioner.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Partitioner.java deleted file mode 100644 index 688ccef..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Partitioner.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.v1; - -import org.apache.hadoop.conf.*; -import org.apache.hadoop.mapred.*; -import org.apache.hadoop.util.*; -import org.apache.ignite.internal.processors.hadoop.*; - -/** - * Hadoop partitioner adapter for v1 API. - */ -public class GridHadoopV1Partitioner implements GridHadoopPartitioner { - /** Partitioner instance. */ - private Partitioner<Object, Object> part; - - /** - * @param cls Hadoop partitioner class. - * @param conf Job configuration. - */ - public GridHadoopV1Partitioner(Class<? extends Partitioner> cls, Configuration conf) { - part = (Partitioner<Object, Object>) ReflectionUtils.newInstance(cls, conf); - } - - /** {@inheritDoc} */ - @Override public int partition(Object key, Object val, int parts) { - return part.getPartition(key, val, parts); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1ReduceTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1ReduceTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1ReduceTask.java deleted file mode 100644 index 3aca637..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1ReduceTask.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.v1; - -import org.apache.hadoop.mapred.*; -import org.apache.hadoop.util.*; -import org.apache.ignite.*; -import org.apache.ignite.internal.processors.hadoop.*; -import org.apache.ignite.internal.processors.hadoop.v2.*; - -/** - * Hadoop reduce task implementation for v1 API. - */ -public class GridHadoopV1ReduceTask extends GridHadoopV1Task { - /** {@code True} if reduce, {@code false} if combine. */ - private final boolean reduce; - - /** - * Constructor. - * - * @param taskInfo Task info. - * @param reduce {@code True} if reduce, {@code false} if combine. - */ - public GridHadoopV1ReduceTask(GridHadoopTaskInfo taskInfo, boolean reduce) { - super(taskInfo); - - this.reduce = reduce; - } - - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override public void run(GridHadoopTaskContext taskCtx) throws IgniteCheckedException { - GridHadoopJob job = taskCtx.job(); - - GridHadoopV2TaskContext ctx = (GridHadoopV2TaskContext)taskCtx; - - JobConf jobConf = ctx.jobConf(); - - GridHadoopTaskInput input = taskCtx.input(); - - GridHadoopV1OutputCollector collector = null; - - try { - collector = collector(jobConf, ctx, reduce || !job.info().hasReducer(), fileName(), ctx.attemptId()); - - Reducer reducer = ReflectionUtils.newInstance(reduce ? jobConf.getReducerClass() : jobConf.getCombinerClass(), - jobConf); - - assert reducer != null; - - try { - try { - while (input.next()) { - if (isCancelled()) - throw new HadoopTaskCancelledException("Reduce task cancelled."); - - reducer.reduce(input.key(), input.values(), collector, Reporter.NULL); - } - } - finally { - reducer.close(); - } - } - finally { - collector.closeWriter(); - } - - collector.commit(); - } - catch (Exception e) { - if (collector != null) - collector.abort(); - - throw new IgniteCheckedException(e); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Reporter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Reporter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Reporter.java deleted file mode 100644 index 791ccdc..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Reporter.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.v1; - -import org.apache.hadoop.mapred.*; -import org.apache.ignite.internal.processors.hadoop.*; -import org.apache.ignite.internal.processors.hadoop.counter.*; - -/** - * Hadoop reporter implementation for v1 API. - */ -public class GridHadoopV1Reporter implements Reporter { - /** Context. */ - private final GridHadoopTaskContext ctx; - - /** - * Creates new instance. - * - * @param ctx Context. - */ - public GridHadoopV1Reporter(GridHadoopTaskContext ctx) { - this.ctx = ctx; - } - - /** {@inheritDoc} */ - @Override public void setStatus(String status) { - // TODO - } - - /** {@inheritDoc} */ - @Override public Counters.Counter getCounter(Enum<?> name) { - return getCounter(name.getDeclaringClass().getName(), name.name()); - } - - /** {@inheritDoc} */ - @Override public Counters.Counter getCounter(String grp, String name) { - return new GridHadoopV1Counter(ctx.counter(grp, name, HadoopLongCounter.class)); - } - - /** {@inheritDoc} */ - @Override public void incrCounter(Enum<?> key, long amount) { - getCounter(key).increment(amount); - } - - /** {@inheritDoc} */ - @Override public void incrCounter(String grp, String cntr, long amount) { - getCounter(grp, cntr).increment(amount); - } - - /** {@inheritDoc} */ - @Override public InputSplit getInputSplit() throws UnsupportedOperationException { - throw new UnsupportedOperationException("reporter has no input"); // TODO - } - - /** {@inheritDoc} */ - @Override public float getProgress() { - return 0.5f; // TODO - } - - /** {@inheritDoc} */ - @Override public void progress() { - // TODO - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1SetupTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1SetupTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1SetupTask.java deleted file mode 100644 index c7dc3fd..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1SetupTask.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.v1; - -import org.apache.hadoop.mapred.*; -import org.apache.ignite.*; -import org.apache.ignite.internal.processors.hadoop.*; -import org.apache.ignite.internal.processors.hadoop.v2.*; - -import java.io.*; - -/** - * Hadoop setup task implementation for v1 API. - */ -public class GridHadoopV1SetupTask extends GridHadoopV1Task { - /** - * Constructor. - * - * @param taskInfo Task info. - */ - public GridHadoopV1SetupTask(GridHadoopTaskInfo taskInfo) { - super(taskInfo); - } - - /** {@inheritDoc} */ - @Override public void run(GridHadoopTaskContext taskCtx) throws IgniteCheckedException { - GridHadoopV2TaskContext ctx = (GridHadoopV2TaskContext)taskCtx; - - try { - ctx.jobConf().getOutputFormat().checkOutputSpecs(null, ctx.jobConf()); - - OutputCommitter committer = ctx.jobConf().getOutputCommitter(); - - if (committer != null) - committer.setupJob(ctx.jobContext()); - } - catch (IOException e) { - throw new IgniteCheckedException(e); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Splitter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Splitter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Splitter.java deleted file mode 100644 index 0e1fb44..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Splitter.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.v1; - -import org.apache.hadoop.fs.*; -import org.apache.hadoop.mapred.*; -import org.apache.ignite.*; -import org.apache.ignite.internal.processors.hadoop.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.util.*; - -/** - * Hadoop API v1 splitter. - */ -public class GridHadoopV1Splitter { - /** */ - private static final String[] EMPTY_HOSTS = {}; - - /** - * @param jobConf Job configuration. - * @return Collection of mapped splits. - * @throws IgniteCheckedException If mapping failed. - */ - public static Collection<GridHadoopInputSplit> splitJob(JobConf jobConf) throws IgniteCheckedException { - try { - InputFormat<?, ?> format = jobConf.getInputFormat(); - - assert format != null; - - InputSplit[] splits = format.getSplits(jobConf, 0); - - Collection<GridHadoopInputSplit> res = new ArrayList<>(splits.length); - - for (int i = 0; i < splits.length; i++) { - InputSplit nativeSplit = splits[i]; - - if (nativeSplit instanceof FileSplit) { - FileSplit s = (FileSplit)nativeSplit; - - res.add(new GridHadoopFileBlock(s.getLocations(), s.getPath().toUri(), s.getStart(), s.getLength())); - } - else - res.add(HadoopUtils.wrapSplit(i, nativeSplit, nativeSplit.getLocations())); - } - - return res; - } - catch (IOException e) { - throw new IgniteCheckedException(e); - } - } - - /** - * @param clsName Input split class name. - * @param in Input stream. - * @param hosts Optional hosts. - * @return File block or {@code null} if it is not a {@link FileSplit} instance. - * @throws IgniteCheckedException If failed. - */ - @Nullable public static GridHadoopFileBlock readFileBlock(String clsName, FSDataInputStream in, - @Nullable String[] hosts) throws IgniteCheckedException { - if (!FileSplit.class.getName().equals(clsName)) - return null; - - FileSplit split = U.newInstance(FileSplit.class); - - try { - split.readFields(in); - } - catch (IOException e) { - throw new IgniteCheckedException(e); - } - - if (hosts == null) - hosts = EMPTY_HOSTS; - - return new GridHadoopFileBlock(hosts, split.getPath().toUri(), split.getStart(), split.getLength()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Task.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Task.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Task.java deleted file mode 100644 index 305bc4e..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Task.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.v1; - -import org.apache.hadoop.mapred.*; -import org.apache.ignite.internal.processors.hadoop.*; -import org.apache.ignite.internal.processors.hadoop.v2.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.text.*; - -/** - * Extended Hadoop v1 task. - */ -public abstract class GridHadoopV1Task extends GridHadoopTask { - /** Indicates that this task is to be cancelled. */ - private volatile boolean cancelled; - - /** - * Constructor. - * - * @param taskInfo Task info. - */ - protected GridHadoopV1Task(GridHadoopTaskInfo taskInfo) { - super(taskInfo); - } - - /** - * Gets file name for that task result. - * - * @return File name. - */ - public String fileName() { - NumberFormat numFormat = NumberFormat.getInstance(); - - numFormat.setMinimumIntegerDigits(5); - numFormat.setGroupingUsed(false); - - return "part-" + numFormat.format(info().taskNumber()); - } - - /** - * - * @param jobConf Job configuration. - * @param taskCtx Task context. - * @param directWrite Direct write flag. - * @param fileName File name. - * @param attempt Attempt of task. - * @return Collector. - * @throws IOException In case of IO exception. - */ - protected GridHadoopV1OutputCollector collector(JobConf jobConf, GridHadoopV2TaskContext taskCtx, - boolean directWrite, @Nullable String fileName, TaskAttemptID attempt) throws IOException { - GridHadoopV1OutputCollector collector = new GridHadoopV1OutputCollector(jobConf, taskCtx, directWrite, - fileName, attempt) { - /** {@inheritDoc} */ - @Override public void collect(Object key, Object val) throws IOException { - if (cancelled) - throw new HadoopTaskCancelledException("Task cancelled."); - - super.collect(key, val); - } - }; - - collector.setup(); - - return collector; - } - - /** {@inheritDoc} */ - @Override public void cancel() { - cancelled = true; - } - - /** Returns true if task is cancelled. */ - public boolean isCancelled() { - return cancelled; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1CleanupTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1CleanupTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1CleanupTask.java new file mode 100644 index 0000000..85f08be --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1CleanupTask.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.v1; + +import org.apache.hadoop.mapred.*; +import org.apache.ignite.*; +import org.apache.ignite.internal.processors.hadoop.*; +import org.apache.ignite.internal.processors.hadoop.v2.*; + +import java.io.*; + +/** + * Hadoop cleanup task implementation for v1 API. + */ +public class HadoopV1CleanupTask extends HadoopV1Task { + /** Abort flag. */ + private final boolean abort; + + /** + * @param taskInfo Task info. + * @param abort Abort flag. + */ + public HadoopV1CleanupTask(GridHadoopTaskInfo taskInfo, boolean abort) { + super(taskInfo); + + this.abort = abort; + } + + /** {@inheritDoc} */ + @Override public void run(GridHadoopTaskContext taskCtx) throws IgniteCheckedException { + HadoopV2TaskContext ctx = (HadoopV2TaskContext)taskCtx; + + JobContext jobCtx = ctx.jobContext(); + + try { + OutputCommitter committer = jobCtx.getJobConf().getOutputCommitter(); + + if (abort) + committer.abortJob(jobCtx, JobStatus.State.FAILED); + else + committer.commitJob(jobCtx); + } + catch (IOException e) { + throw new IgniteCheckedException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Counter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Counter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Counter.java new file mode 100644 index 0000000..609297b --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Counter.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.v1; + +import org.apache.hadoop.mapred.Counters; +import org.apache.hadoop.mapreduce.*; +import org.apache.ignite.internal.processors.hadoop.counter.*; +import org.apache.ignite.internal.processors.hadoop.v2.*; + +import java.io.*; + +import static org.apache.hadoop.mapreduce.util.CountersStrings.*; + +/** + * Hadoop counter implementation for v1 API. + */ +public class HadoopV1Counter extends Counters.Counter { + /** Delegate. */ + private final HadoopLongCounter cntr; + + /** + * Creates new instance. + * + * @param cntr Delegate counter. + */ + public HadoopV1Counter(HadoopLongCounter cntr) { + this.cntr = cntr; + } + + /** {@inheritDoc} */ + @Override public void setDisplayName(String displayName) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public String getName() { + return cntr.name(); + } + + /** {@inheritDoc} */ + @Override public String getDisplayName() { + return getName(); + } + + /** {@inheritDoc} */ + @Override public long getValue() { + return cntr.value(); + } + + /** {@inheritDoc} */ + @Override public void setValue(long val) { + cntr.value(val); + } + + /** {@inheritDoc} */ + @Override public void increment(long incr) { + cntr.increment(incr); + } + + /** {@inheritDoc} */ + @Override public void write(DataOutput out) throws IOException { + throw new UnsupportedOperationException("not implemented"); + } + + /** {@inheritDoc} */ + @Override public void readFields(DataInput in) throws IOException { + throw new UnsupportedOperationException("not implemented"); + } + + /** {@inheritDoc} */ + @Override public String makeEscapedCompactString() { + return toEscapedCompactString(new HadoopV2Counter(cntr)); + } + + /** {@inheritDoc} */ + @SuppressWarnings("deprecation") + @Override public boolean contentEquals(Counters.Counter cntr) { + return getUnderlyingCounter().equals(cntr.getUnderlyingCounter()); + } + + /** {@inheritDoc} */ + @Override public long getCounter() { + return cntr.value(); + } + + /** {@inheritDoc} */ + @Override public Counter getUnderlyingCounter() { + return this; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1MapTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1MapTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1MapTask.java new file mode 100644 index 0000000..51856d6 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1MapTask.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.v1; + +import org.apache.hadoop.fs.*; +import org.apache.hadoop.mapred.*; +import org.apache.hadoop.util.*; +import org.apache.ignite.*; +import org.apache.ignite.internal.processors.hadoop.*; +import org.apache.ignite.internal.processors.hadoop.v2.*; + +/** + * Hadoop map task implementation for v1 API. + */ +public class HadoopV1MapTask extends HadoopV1Task { + /** */ + private static final String[] EMPTY_HOSTS = new String[0]; + + /** + * Constructor. + * + * @param taskInfo + */ + public HadoopV1MapTask(GridHadoopTaskInfo taskInfo) { + super(taskInfo); + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public void run(GridHadoopTaskContext taskCtx) throws IgniteCheckedException { + GridHadoopJob job = taskCtx.job(); + + HadoopV2TaskContext ctx = (HadoopV2TaskContext)taskCtx; + + JobConf jobConf = ctx.jobConf(); + + InputFormat inFormat = jobConf.getInputFormat(); + + GridHadoopInputSplit split = info().inputSplit(); + + InputSplit nativeSplit; + + if (split instanceof GridHadoopFileBlock) { + GridHadoopFileBlock block = (GridHadoopFileBlock)split; + + nativeSplit = new FileSplit(new Path(block.file().toString()), block.start(), block.length(), EMPTY_HOSTS); + } + else + nativeSplit = (InputSplit)ctx.getNativeSplit(split); + + assert nativeSplit != null; + + Reporter reporter = new HadoopV1Reporter(taskCtx); + + HadoopV1OutputCollector collector = null; + + try { + collector = collector(jobConf, ctx, !job.info().hasCombiner() && !job.info().hasReducer(), + fileName(), ctx.attemptId()); + + RecordReader reader = inFormat.getRecordReader(nativeSplit, jobConf, reporter); + + Mapper mapper = ReflectionUtils.newInstance(jobConf.getMapperClass(), jobConf); + + Object key = reader.createKey(); + Object val = reader.createValue(); + + assert mapper != null; + + try { + try { + while (reader.next(key, val)) { + if (isCancelled()) + throw new HadoopTaskCancelledException("Map task cancelled."); + + mapper.map(key, val, collector, reporter); + } + } + finally { + mapper.close(); + } + } + finally { + collector.closeWriter(); + } + + collector.commit(); + } + catch (Exception e) { + if (collector != null) + collector.abort(); + + throw new IgniteCheckedException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1OutputCollector.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1OutputCollector.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1OutputCollector.java new file mode 100644 index 0000000..ac23bb3 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1OutputCollector.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.v1; + +import org.apache.hadoop.mapred.*; +import org.apache.ignite.*; +import org.apache.ignite.internal.processors.hadoop.*; +import org.jetbrains.annotations.*; + +import java.io.*; + +/** + * Hadoop output collector. + */ +public class HadoopV1OutputCollector implements OutputCollector { + /** Job configuration. */ + private final JobConf jobConf; + + /** Task context. */ + private final GridHadoopTaskContext taskCtx; + + /** Optional direct writer. */ + private final RecordWriter writer; + + /** Task attempt. */ + private final TaskAttemptID attempt; + + /** + * @param jobConf Job configuration. + * @param taskCtx Task context. + * @param directWrite Direct write flag. + * @param fileName File name. + * @throws IOException In case of IO exception. + */ + HadoopV1OutputCollector(JobConf jobConf, GridHadoopTaskContext taskCtx, boolean directWrite, + @Nullable String fileName, TaskAttemptID attempt) throws IOException { + this.jobConf = jobConf; + this.taskCtx = taskCtx; + this.attempt = attempt; + + if (directWrite) { + jobConf.set("mapreduce.task.attempt.id", attempt.toString()); + + OutputFormat outFormat = jobConf.getOutputFormat(); + + writer = outFormat.getRecordWriter(null, jobConf, fileName, Reporter.NULL); + } + else + writer = null; + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public void collect(Object key, Object val) throws IOException { + if (writer != null) + writer.write(key, val); + else { + try { + taskCtx.output().write(key, val); + } + catch (IgniteCheckedException e) { + throw new IOException(e); + } + } + } + + /** + * Close writer. + * + * @throws IOException In case of IO exception. + */ + public void closeWriter() throws IOException { + if (writer != null) + writer.close(Reporter.NULL); + } + + /** + * Setup task. + * + * @throws IOException If failed. + */ + public void setup() throws IOException { + if (writer != null) + jobConf.getOutputCommitter().setupTask(new TaskAttemptContextImpl(jobConf, attempt)); + } + + /** + * Commit task. + * + * @throws IOException In failed. + */ + public void commit() throws IOException { + if (writer != null) { + OutputCommitter outputCommitter = jobConf.getOutputCommitter(); + + TaskAttemptContext taskCtx = new TaskAttemptContextImpl(jobConf, attempt); + + if (outputCommitter.needsTaskCommit(taskCtx)) + outputCommitter.commitTask(taskCtx); + } + } + + /** + * Abort task. + */ + public void abort() { + try { + if (writer != null) + jobConf.getOutputCommitter().abortTask(new TaskAttemptContextImpl(jobConf, attempt)); + } + catch (IOException ignore) { + // No-op. + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Partitioner.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Partitioner.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Partitioner.java new file mode 100644 index 0000000..36fdd55 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Partitioner.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.v1; + +import org.apache.hadoop.conf.*; +import org.apache.hadoop.mapred.*; +import org.apache.hadoop.util.*; +import org.apache.ignite.internal.processors.hadoop.*; + +/** + * Hadoop partitioner adapter for v1 API. + */ +public class HadoopV1Partitioner implements GridHadoopPartitioner { + /** Partitioner instance. */ + private Partitioner<Object, Object> part; + + /** + * @param cls Hadoop partitioner class. + * @param conf Job configuration. + */ + public HadoopV1Partitioner(Class<? extends Partitioner> cls, Configuration conf) { + part = (Partitioner<Object, Object>) ReflectionUtils.newInstance(cls, conf); + } + + /** {@inheritDoc} */ + @Override public int partition(Object key, Object val, int parts) { + return part.getPartition(key, val, parts); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1ReduceTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1ReduceTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1ReduceTask.java new file mode 100644 index 0000000..b5c6bfa --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1ReduceTask.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.v1; + +import org.apache.hadoop.mapred.*; +import org.apache.hadoop.util.*; +import org.apache.ignite.*; +import org.apache.ignite.internal.processors.hadoop.*; +import org.apache.ignite.internal.processors.hadoop.v2.*; + +/** + * Hadoop reduce task implementation for v1 API. + */ +public class HadoopV1ReduceTask extends HadoopV1Task { + /** {@code True} if reduce, {@code false} if combine. */ + private final boolean reduce; + + /** + * Constructor. + * + * @param taskInfo Task info. + * @param reduce {@code True} if reduce, {@code false} if combine. + */ + public HadoopV1ReduceTask(GridHadoopTaskInfo taskInfo, boolean reduce) { + super(taskInfo); + + this.reduce = reduce; + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public void run(GridHadoopTaskContext taskCtx) throws IgniteCheckedException { + GridHadoopJob job = taskCtx.job(); + + HadoopV2TaskContext ctx = (HadoopV2TaskContext)taskCtx; + + JobConf jobConf = ctx.jobConf(); + + GridHadoopTaskInput input = taskCtx.input(); + + HadoopV1OutputCollector collector = null; + + try { + collector = collector(jobConf, ctx, reduce || !job.info().hasReducer(), fileName(), ctx.attemptId()); + + Reducer reducer = ReflectionUtils.newInstance(reduce ? jobConf.getReducerClass() : jobConf.getCombinerClass(), + jobConf); + + assert reducer != null; + + try { + try { + while (input.next()) { + if (isCancelled()) + throw new HadoopTaskCancelledException("Reduce task cancelled."); + + reducer.reduce(input.key(), input.values(), collector, Reporter.NULL); + } + } + finally { + reducer.close(); + } + } + finally { + collector.closeWriter(); + } + + collector.commit(); + } + catch (Exception e) { + if (collector != null) + collector.abort(); + + throw new IgniteCheckedException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Reporter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Reporter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Reporter.java new file mode 100644 index 0000000..db4e159 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Reporter.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.v1; + +import org.apache.hadoop.mapred.*; +import org.apache.ignite.internal.processors.hadoop.*; +import org.apache.ignite.internal.processors.hadoop.counter.*; + +/** + * Hadoop reporter implementation for v1 API. + */ +public class HadoopV1Reporter implements Reporter { + /** Context. */ + private final GridHadoopTaskContext ctx; + + /** + * Creates new instance. + * + * @param ctx Context. + */ + public HadoopV1Reporter(GridHadoopTaskContext ctx) { + this.ctx = ctx; + } + + /** {@inheritDoc} */ + @Override public void setStatus(String status) { + // TODO + } + + /** {@inheritDoc} */ + @Override public Counters.Counter getCounter(Enum<?> name) { + return getCounter(name.getDeclaringClass().getName(), name.name()); + } + + /** {@inheritDoc} */ + @Override public Counters.Counter getCounter(String grp, String name) { + return new HadoopV1Counter(ctx.counter(grp, name, HadoopLongCounter.class)); + } + + /** {@inheritDoc} */ + @Override public void incrCounter(Enum<?> key, long amount) { + getCounter(key).increment(amount); + } + + /** {@inheritDoc} */ + @Override public void incrCounter(String grp, String cntr, long amount) { + getCounter(grp, cntr).increment(amount); + } + + /** {@inheritDoc} */ + @Override public InputSplit getInputSplit() throws UnsupportedOperationException { + throw new UnsupportedOperationException("reporter has no input"); // TODO + } + + /** {@inheritDoc} */ + @Override public float getProgress() { + return 0.5f; // TODO + } + + /** {@inheritDoc} */ + @Override public void progress() { + // TODO + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1SetupTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1SetupTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1SetupTask.java new file mode 100644 index 0000000..c427774 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1SetupTask.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.v1; + +import org.apache.hadoop.mapred.*; +import org.apache.ignite.*; +import org.apache.ignite.internal.processors.hadoop.*; +import org.apache.ignite.internal.processors.hadoop.v2.*; + +import java.io.*; + +/** + * Hadoop setup task implementation for v1 API. + */ +public class HadoopV1SetupTask extends HadoopV1Task { + /** + * Constructor. + * + * @param taskInfo Task info. + */ + public HadoopV1SetupTask(GridHadoopTaskInfo taskInfo) { + super(taskInfo); + } + + /** {@inheritDoc} */ + @Override public void run(GridHadoopTaskContext taskCtx) throws IgniteCheckedException { + HadoopV2TaskContext ctx = (HadoopV2TaskContext)taskCtx; + + try { + ctx.jobConf().getOutputFormat().checkOutputSpecs(null, ctx.jobConf()); + + OutputCommitter committer = ctx.jobConf().getOutputCommitter(); + + if (committer != null) + committer.setupJob(ctx.jobContext()); + } + catch (IOException e) { + throw new IgniteCheckedException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Splitter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Splitter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Splitter.java new file mode 100644 index 0000000..0d89082 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Splitter.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.v1; + +import org.apache.hadoop.fs.*; +import org.apache.hadoop.mapred.*; +import org.apache.ignite.*; +import org.apache.ignite.internal.processors.hadoop.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.util.*; + +/** + * Hadoop API v1 splitter. + */ +public class HadoopV1Splitter { + /** */ + private static final String[] EMPTY_HOSTS = {}; + + /** + * @param jobConf Job configuration. + * @return Collection of mapped splits. + * @throws IgniteCheckedException If mapping failed. + */ + public static Collection<GridHadoopInputSplit> splitJob(JobConf jobConf) throws IgniteCheckedException { + try { + InputFormat<?, ?> format = jobConf.getInputFormat(); + + assert format != null; + + InputSplit[] splits = format.getSplits(jobConf, 0); + + Collection<GridHadoopInputSplit> res = new ArrayList<>(splits.length); + + for (int i = 0; i < splits.length; i++) { + InputSplit nativeSplit = splits[i]; + + if (nativeSplit instanceof FileSplit) { + FileSplit s = (FileSplit)nativeSplit; + + res.add(new GridHadoopFileBlock(s.getLocations(), s.getPath().toUri(), s.getStart(), s.getLength())); + } + else + res.add(HadoopUtils.wrapSplit(i, nativeSplit, nativeSplit.getLocations())); + } + + return res; + } + catch (IOException e) { + throw new IgniteCheckedException(e); + } + } + + /** + * @param clsName Input split class name. + * @param in Input stream. + * @param hosts Optional hosts. + * @return File block or {@code null} if it is not a {@link FileSplit} instance. + * @throws IgniteCheckedException If failed. + */ + @Nullable public static GridHadoopFileBlock readFileBlock(String clsName, FSDataInputStream in, + @Nullable String[] hosts) throws IgniteCheckedException { + if (!FileSplit.class.getName().equals(clsName)) + return null; + + FileSplit split = U.newInstance(FileSplit.class); + + try { + split.readFields(in); + } + catch (IOException e) { + throw new IgniteCheckedException(e); + } + + if (hosts == null) + hosts = EMPTY_HOSTS; + + return new GridHadoopFileBlock(hosts, split.getPath().toUri(), split.getStart(), split.getLength()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Task.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Task.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Task.java new file mode 100644 index 0000000..71a259c --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Task.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.v1; + +import org.apache.hadoop.mapred.*; +import org.apache.ignite.internal.processors.hadoop.*; +import org.apache.ignite.internal.processors.hadoop.v2.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.text.*; + +/** + * Extended Hadoop v1 task. + */ +public abstract class HadoopV1Task extends GridHadoopTask { + /** Indicates that this task is to be cancelled. */ + private volatile boolean cancelled; + + /** + * Constructor. + * + * @param taskInfo Task info. + */ + protected HadoopV1Task(GridHadoopTaskInfo taskInfo) { + super(taskInfo); + } + + /** + * Gets file name for that task result. + * + * @return File name. + */ + public String fileName() { + NumberFormat numFormat = NumberFormat.getInstance(); + + numFormat.setMinimumIntegerDigits(5); + numFormat.setGroupingUsed(false); + + return "part-" + numFormat.format(info().taskNumber()); + } + + /** + * + * @param jobConf Job configuration. + * @param taskCtx Task context. + * @param directWrite Direct write flag. + * @param fileName File name. + * @param attempt Attempt of task. + * @return Collector. + * @throws IOException In case of IO exception. + */ + protected HadoopV1OutputCollector collector(JobConf jobConf, HadoopV2TaskContext taskCtx, + boolean directWrite, @Nullable String fileName, TaskAttemptID attempt) throws IOException { + HadoopV1OutputCollector collector = new HadoopV1OutputCollector(jobConf, taskCtx, directWrite, + fileName, attempt) { + /** {@inheritDoc} */ + @Override public void collect(Object key, Object val) throws IOException { + if (cancelled) + throw new HadoopTaskCancelledException("Task cancelled."); + + super.collect(key, val); + } + }; + + collector.setup(); + + return collector; + } + + /** {@inheritDoc} */ + @Override public void cancel() { + cancelled = true; + } + + /** Returns true if task is cancelled. */ + public boolean isCancelled() { + return cancelled; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopExternalSplit.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopExternalSplit.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopExternalSplit.java deleted file mode 100644 index 36b40a2..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopExternalSplit.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.v2; - -import org.apache.ignite.internal.processors.hadoop.*; - -import java.io.*; - -/** - * Split serialized in external file. - */ -public class GridHadoopExternalSplit extends GridHadoopInputSplit { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - private long off; - - /** - * For {@link Externalizable}. - */ - public GridHadoopExternalSplit() { - // No-op. - } - - /** - * @param hosts Hosts. - * @param off Offset of this split in external file. - */ - public GridHadoopExternalSplit(String[] hosts, long off) { - assert off >= 0 : off; - assert hosts != null; - - this.hosts = hosts; - this.off = off; - } - - /** - * @return Offset of this input split in external file. - */ - public long offset() { - return off; - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeLong(off); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - off = in.readLong(); - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object o) { - if (this == o) - return true; - - if (o == null || getClass() != o.getClass()) - return false; - - GridHadoopExternalSplit that = (GridHadoopExternalSplit) o; - - return off == that.off; - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - return (int)(off ^ (off >>> 32)); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopNativeCodeLoader.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopNativeCodeLoader.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopNativeCodeLoader.java deleted file mode 100644 index 5ef4759..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopNativeCodeLoader.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.v2; - -import org.apache.hadoop.classification.*; -import org.apache.hadoop.conf.*; - -/** - * A fake helper to load the native hadoop code i.e. libhadoop.so. - */ -@InterfaceAudience.Private -@InterfaceStability.Unstable -public class GridHadoopNativeCodeLoader { - /** - * Check if native-hadoop code is loaded for this platform. - * - * @return <code>true</code> if native-hadoop is loaded, - * else <code>false</code> - */ - public static boolean isNativeCodeLoaded() { - return false; - } - - /** - * Returns true only if this build was compiled with support for snappy. - */ - public static boolean buildSupportsSnappy() { - return false; - } - - /** - * @return Library name. - */ - public static String getLibraryName() { - throw new IllegalStateException(); - } - - /** - * Return if native hadoop libraries, if present, can be used for this job. - * @param conf configuration - * - * @return <code>true</code> if native hadoop libraries, if present, can be - * used for this job; <code>false</code> otherwise. - */ - public boolean getLoadNativeLibraries(Configuration conf) { - return false; - } - - /** - * Set if native hadoop libraries, if present, can be used for this job. - * - * @param conf configuration - * @param loadNativeLibraries can native hadoop libraries be loaded - */ - public void setLoadNativeLibraries(Configuration conf, boolean loadNativeLibraries) { - // No-op. - } -} - http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopSerializationWrapper.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopSerializationWrapper.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopSerializationWrapper.java deleted file mode 100644 index 0f38548..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopSerializationWrapper.java +++ /dev/null @@ -1,133 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.v2; - -import org.apache.hadoop.io.serializer.*; -import org.apache.ignite.*; -import org.apache.ignite.internal.processors.hadoop.*; -import org.jetbrains.annotations.*; - -import java.io.*; - -/** - * The wrapper around external serializer. - */ -public class GridHadoopSerializationWrapper<T> implements GridHadoopSerialization { - /** External serializer - writer. */ - private final Serializer<T> serializer; - - /** External serializer - reader. */ - private final Deserializer<T> deserializer; - - /** Data output for current write operation. */ - private OutputStream currOut; - - /** Data input for current read operation. */ - private InputStream currIn; - - /** Wrapper around current output to provide OutputStream interface. */ - private final OutputStream outStream = new OutputStream() { - /** {@inheritDoc} */ - @Override public void write(int b) throws IOException { - currOut.write(b); - } - - /** {@inheritDoc} */ - @Override public void write(byte[] b, int off, int len) throws IOException { - currOut.write(b, off, len); - } - }; - - /** Wrapper around current input to provide InputStream interface. */ - private final InputStream inStream = new InputStream() { - /** {@inheritDoc} */ - @Override public int read() throws IOException { - return currIn.read(); - } - - /** {@inheritDoc} */ - @Override public int read(byte[] b, int off, int len) throws IOException { - return currIn.read(b, off, len); - } - }; - - /** - * @param serialization External serializer to wrap. - * @param cls The class to serialize. - */ - public GridHadoopSerializationWrapper(Serialization<T> serialization, Class<T> cls) throws IgniteCheckedException { - assert cls != null; - - serializer = serialization.getSerializer(cls); - deserializer = serialization.getDeserializer(cls); - - try { - serializer.open(outStream); - deserializer.open(inStream); - } - catch (IOException e) { - throw new IgniteCheckedException(e); - } - } - - /** {@inheritDoc} */ - @Override public void write(DataOutput out, Object obj) throws IgniteCheckedException { - assert out != null; - assert obj != null; - - try { - currOut = (OutputStream)out; - - serializer.serialize((T)obj); - - currOut = null; - } - catch (IOException e) { - throw new IgniteCheckedException(e); - } - } - - /** {@inheritDoc} */ - @Override public Object read(DataInput in, @Nullable Object obj) throws IgniteCheckedException { - assert in != null; - - try { - currIn = (InputStream)in; - - T res = deserializer.deserialize((T) obj); - - currIn = null; - - return res; - } - catch (IOException e) { - throw new IgniteCheckedException(e); - } - } - - /** {@inheritDoc} */ - @Override public void close() throws IgniteCheckedException { - try { - serializer.close(); - deserializer.close(); - } - catch (IOException e) { - throw new IgniteCheckedException(e); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopShutdownHookManager.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopShutdownHookManager.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopShutdownHookManager.java deleted file mode 100644 index 48558fc..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopShutdownHookManager.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.v2; - -import java.util.*; -import java.util.concurrent.atomic.*; - -/** - * Fake manager for shutdown hooks. - */ -public class GridHadoopShutdownHookManager { - /** */ - private static final GridHadoopShutdownHookManager MGR = new GridHadoopShutdownHookManager(); - - /** - * Return <code>ShutdownHookManager</code> singleton. - * - * @return <code>ShutdownHookManager</code> singleton. - */ - public static GridHadoopShutdownHookManager get() { - return MGR; - } - - /** */ - private Set<Runnable> hooks = Collections.synchronizedSet(new HashSet<Runnable>()); - - /** */ - private AtomicBoolean shutdownInProgress = new AtomicBoolean(false); - - /** - * Singleton. - */ - private GridHadoopShutdownHookManager() { - // No-op. - } - - /** - * Adds a shutdownHook with a priority, the higher the priority - * the earlier will run. ShutdownHooks with same priority run - * in a non-deterministic order. - * - * @param shutdownHook shutdownHook <code>Runnable</code> - * @param priority priority of the shutdownHook. - */ - public void addShutdownHook(Runnable shutdownHook, int priority) { - if (shutdownHook == null) - throw new IllegalArgumentException("shutdownHook cannot be NULL"); - - hooks.add(shutdownHook); - } - - /** - * Removes a shutdownHook. - * - * @param shutdownHook shutdownHook to remove. - * @return TRUE if the shutdownHook was registered and removed, - * FALSE otherwise. - */ - public boolean removeShutdownHook(Runnable shutdownHook) { - return hooks.remove(shutdownHook); - } - - /** - * Indicates if a shutdownHook is registered or not. - * - * @param shutdownHook shutdownHook to check if registered. - * @return TRUE/FALSE depending if the shutdownHook is is registered. - */ - public boolean hasShutdownHook(Runnable shutdownHook) { - return hooks.contains(shutdownHook); - } - - /** - * Indicates if shutdown is in progress or not. - * - * @return TRUE if the shutdown is in progress, otherwise FALSE. - */ - public boolean isShutdownInProgress() { - return shutdownInProgress.get(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopSplitWrapper.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopSplitWrapper.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopSplitWrapper.java deleted file mode 100644 index 57edfa9..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopSplitWrapper.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.v2; - -import org.apache.ignite.internal.processors.hadoop.*; -import org.apache.ignite.internal.util.typedef.internal.*; - -import java.io.*; - -/** - * The wrapper for native hadoop input splits. - * - * Warning!! This class must not depend on any Hadoop classes directly or indirectly. - */ -public class GridHadoopSplitWrapper extends GridHadoopInputSplit { - /** */ - private static final long serialVersionUID = 0L; - - /** Native hadoop input split. */ - private byte[] bytes; - - /** */ - private String clsName; - - /** Internal ID */ - private int id; - - /** - * Creates new split wrapper. - */ - public GridHadoopSplitWrapper() { - // No-op. - } - - /** - * Creates new split wrapper. - * - * @param id Split ID. - * @param clsName Class name. - * @param bytes Serialized class. - * @param hosts Hosts where split is located. - */ - public GridHadoopSplitWrapper(int id, String clsName, byte[] bytes, String[] hosts) { - assert hosts != null; - assert clsName != null; - assert bytes != null; - - this.hosts = hosts; - this.id = id; - - this.clsName = clsName; - this.bytes = bytes; - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeInt(id); - - out.writeUTF(clsName); - U.writeByteArray(out, bytes); - } - - /** - * @return Class name. - */ - public String className() { - return clsName; - } - - /** - * @return Class bytes. - */ - public byte[] bytes() { - return bytes; - } - - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - id = in.readInt(); - - clsName = in.readUTF(); - bytes = U.readByteArray(in); - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object o) { - if (this == o) - return true; - - if (o == null || getClass() != o.getClass()) - return false; - - GridHadoopSplitWrapper that = (GridHadoopSplitWrapper)o; - - return id == that.id; - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - return id; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2CleanupTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2CleanupTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2CleanupTask.java deleted file mode 100644 index 38be3da..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2CleanupTask.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.v2; - -import org.apache.hadoop.mapred.*; -import org.apache.hadoop.mapreduce.JobStatus; -import org.apache.hadoop.mapreduce.OutputCommitter; -import org.apache.hadoop.mapreduce.OutputFormat; -import org.apache.ignite.*; -import org.apache.ignite.internal.processors.hadoop.*; -import org.apache.ignite.internal.*; - -import java.io.*; - -/** - * Hadoop cleanup task (commits or aborts job). - */ -public class GridHadoopV2CleanupTask extends GridHadoopV2Task { - /** Abort flag. */ - private final boolean abort; - - /** - * @param taskInfo Task info. - * @param abort Abort flag. - */ - public GridHadoopV2CleanupTask(GridHadoopTaskInfo taskInfo, boolean abort) { - super(taskInfo); - - this.abort = abort; - } - - /** {@inheritDoc} */ - @SuppressWarnings("ConstantConditions") - @Override public void run0(GridHadoopV2TaskContext taskCtx) throws IgniteCheckedException { - JobContextImpl jobCtx = taskCtx.jobContext(); - - try { - OutputFormat outputFormat = getOutputFormat(jobCtx); - - OutputCommitter committer = outputFormat.getOutputCommitter(hadoopContext()); - - if (committer != null) { - if (abort) - committer.abortJob(jobCtx, JobStatus.State.FAILED); - else - committer.commitJob(jobCtx); - } - } - catch (ClassNotFoundException | IOException e) { - throw new IgniteCheckedException(e); - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - - throw new IgniteInterruptedCheckedException(e); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2Context.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2Context.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2Context.java deleted file mode 100644 index 9964d91..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2Context.java +++ /dev/null @@ -1,230 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.v2; - -import org.apache.hadoop.fs.*; -import org.apache.hadoop.mapreduce.*; -import org.apache.hadoop.mapreduce.lib.input.*; -import org.apache.hadoop.mapreduce.task.*; -import org.apache.ignite.*; -import org.apache.ignite.internal.processors.hadoop.*; -import org.apache.ignite.internal.processors.hadoop.counter.*; - -import java.io.*; -import java.util.*; - -/** - * Hadoop context implementation for v2 API. It provides IO operations for hadoop tasks. - */ -public class GridHadoopV2Context extends JobContextImpl implements MapContext, ReduceContext { - /** Input reader to overriding of GridHadoopTaskContext input. */ - private RecordReader reader; - - /** Output writer to overriding of GridHadoopTaskContext output. */ - private RecordWriter writer; - - /** Output is provided by executor environment. */ - private final GridHadoopTaskOutput output; - - /** Input is provided by executor environment. */ - private final GridHadoopTaskInput input; - - /** Unique identifier for a task attempt. */ - private final TaskAttemptID taskAttemptID; - - /** Indicates that this task is to be cancelled. */ - private volatile boolean cancelled; - - /** Input split. */ - private InputSplit inputSplit; - - /** */ - private final GridHadoopTaskContext ctx; - - /** */ - private String status; - - /** - * @param ctx Context for IO operations. - */ - public GridHadoopV2Context(GridHadoopV2TaskContext ctx) { - super(ctx.jobConf(), ctx.jobContext().getJobID()); - - taskAttemptID = ctx.attemptId(); - - conf.set("mapreduce.job.id", taskAttemptID.getJobID().toString()); - conf.set("mapreduce.task.id", taskAttemptID.getTaskID().toString()); - - output = ctx.output(); - input = ctx.input(); - - this.ctx = ctx; - } - - /** {@inheritDoc} */ - @Override public InputSplit getInputSplit() { - if (inputSplit == null) { - GridHadoopInputSplit split = ctx.taskInfo().inputSplit(); - - if (split == null) - return null; - - if (split instanceof GridHadoopFileBlock) { - GridHadoopFileBlock fileBlock = (GridHadoopFileBlock)split; - - inputSplit = new FileSplit(new Path(fileBlock.file()), fileBlock.start(), fileBlock.length(), null); - } - else if (split instanceof GridHadoopExternalSplit) - throw new UnsupportedOperationException(); // TODO - else if (split instanceof GridHadoopSplitWrapper) - inputSplit = (InputSplit) HadoopUtils.unwrapSplit((GridHadoopSplitWrapper) split); - else - throw new IllegalStateException(); - } - - return inputSplit; - } - - /** {@inheritDoc} */ - @Override public boolean nextKeyValue() throws IOException, InterruptedException { - if (cancelled) - throw new HadoopTaskCancelledException("Task cancelled."); - - return reader.nextKeyValue(); - } - - /** {@inheritDoc} */ - @Override public Object getCurrentKey() throws IOException, InterruptedException { - if (reader != null) - return reader.getCurrentKey(); - - return input.key(); - } - - /** {@inheritDoc} */ - @Override public Object getCurrentValue() throws IOException, InterruptedException { - return reader.getCurrentValue(); - } - - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override public void write(Object key, Object val) throws IOException, InterruptedException { - if (cancelled) - throw new HadoopTaskCancelledException("Task cancelled."); - - if (writer != null) - writer.write(key, val); - else { - try { - output.write(key, val); - } - catch (IgniteCheckedException e) { - throw new IOException(e); - } - } - } - - /** {@inheritDoc} */ - @Override public OutputCommitter getOutputCommitter() { - throw new UnsupportedOperationException(); - } - - /** {@inheritDoc} */ - @Override public TaskAttemptID getTaskAttemptID() { - return taskAttemptID; - } - - /** {@inheritDoc} */ - @Override public void setStatus(String msg) { - status = msg; - } - - /** {@inheritDoc} */ - @Override public String getStatus() { - return status; - } - - /** {@inheritDoc} */ - @Override public float getProgress() { - return 0.5f; // TODO - } - - /** {@inheritDoc} */ - @Override public Counter getCounter(Enum<?> cntrName) { - return getCounter(cntrName.getDeclaringClass().getName(), cntrName.name()); - } - - /** {@inheritDoc} */ - @Override public Counter getCounter(String grpName, String cntrName) { - return new GridHadoopV2Counter(ctx.counter(grpName, cntrName, HadoopLongCounter.class)); - } - - /** {@inheritDoc} */ - @Override public void progress() { - // No-op. - } - - /** - * Overrides default input data reader. - * - * @param reader New reader. - */ - public void reader(RecordReader reader) { - this.reader = reader; - } - - /** {@inheritDoc} */ - @Override public boolean nextKey() throws IOException, InterruptedException { - if (cancelled) - throw new HadoopTaskCancelledException("Task cancelled."); - - return input.next(); - } - - /** {@inheritDoc} */ - @Override public Iterable getValues() throws IOException, InterruptedException { - return new Iterable() { - @Override public Iterator iterator() { - return input.values(); - } - }; - } - - /** - * @return Overridden output data writer. - */ - public RecordWriter writer() { - return writer; - } - - /** - * Overrides default output data writer. - * - * @param writer New writer. - */ - public void writer(RecordWriter writer) { - this.writer = writer; - } - - /** - * Cancels the task by stop the IO. - */ - public void cancel() { - cancelled = true; - } -}