http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopMessageListener.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopMessageListener.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopMessageListener.java new file mode 100644 index 0000000..c21e494 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopMessageListener.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.taskexecutor.external.communication; + +import org.apache.ignite.internal.processors.hadoop.message.*; +import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.*; + +/** + * Hadoop communication message listener. + */ +public interface HadoopMessageListener { + /** + * @param desc Process descriptor. + * @param msg Hadoop message. + */ + public void onMessageReceived(HadoopProcessDescriptor desc, HadoopMessage msg); + + /** + * Called when connection to remote process was lost. + * + * @param desc Process descriptor. + */ + public void onConnectionLost(HadoopProcessDescriptor desc); +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopTcpNioCommunicationClient.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopTcpNioCommunicationClient.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopTcpNioCommunicationClient.java new file mode 100644 index 0000000..c4d1c54 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopTcpNioCommunicationClient.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.taskexecutor.external.communication; + +import org.apache.ignite.*; +import org.apache.ignite.internal.processors.hadoop.message.*; +import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.*; +import org.apache.ignite.internal.util.nio.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.io.*; + +/** + * Grid client for NIO server. + */ +public class HadoopTcpNioCommunicationClient extends HadoopAbstractCommunicationClient { + /** Socket. */ + private final GridNioSession ses; + + /** + * Constructor for test purposes only. + */ + public HadoopTcpNioCommunicationClient() { + ses = null; + } + + /** + * @param ses Session. + */ + public HadoopTcpNioCommunicationClient(GridNioSession ses) { + assert ses != null; + + this.ses = ses; + } + + /** {@inheritDoc} */ + @Override public boolean close() { + boolean res = super.close(); + + if (res) + ses.close(); + + return res; + } + + /** {@inheritDoc} */ + @Override public void forceClose() { + super.forceClose(); + + ses.close(); + } + + /** {@inheritDoc} */ + @Override public void sendMessage(HadoopProcessDescriptor desc, HadoopMessage msg) + throws IgniteCheckedException { + if (closed()) + throw new IgniteCheckedException("Client was closed: " + this); + + GridNioFuture<?> fut = ses.send(msg); + + if (fut.isDone()) { + try { + fut.get(); + } + catch (IOException e) { + throw new IgniteCheckedException("Failed to send message [client=" + this + ']', e); + } + } + } + + /** {@inheritDoc} */ + @Override public long getIdleTime() { + long now = U.currentTimeMillis(); + + // Session can be used for receiving and sending. + return Math.min(Math.min(now - ses.lastReceiveTime(), now - ses.lastSendScheduleTime()), + now - ses.lastSendTime()); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(HadoopTcpNioCommunicationClient.class, this, super.toString()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1CleanupTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1CleanupTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1CleanupTask.java deleted file mode 100644 index 99ee9b77..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1CleanupTask.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.v1; - -import org.apache.hadoop.mapred.*; -import org.apache.ignite.*; -import org.apache.ignite.internal.processors.hadoop.*; -import org.apache.ignite.internal.processors.hadoop.v2.*; - -import java.io.*; - -/** - * Hadoop cleanup task implementation for v1 API. - */ -public class GridHadoopV1CleanupTask extends GridHadoopV1Task { - /** Abort flag. */ - private final boolean abort; - - /** - * @param taskInfo Task info. - * @param abort Abort flag. - */ - public GridHadoopV1CleanupTask(GridHadoopTaskInfo taskInfo, boolean abort) { - super(taskInfo); - - this.abort = abort; - } - - /** {@inheritDoc} */ - @Override public void run(GridHadoopTaskContext taskCtx) throws IgniteCheckedException { - GridHadoopV2TaskContext ctx = (GridHadoopV2TaskContext)taskCtx; - - JobContext jobCtx = ctx.jobContext(); - - try { - OutputCommitter committer = jobCtx.getJobConf().getOutputCommitter(); - - if (abort) - committer.abortJob(jobCtx, JobStatus.State.FAILED); - else - committer.commitJob(jobCtx); - } - catch (IOException e) { - throw new IgniteCheckedException(e); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Counter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Counter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Counter.java deleted file mode 100644 index b986d3e..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Counter.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.v1; - -import org.apache.hadoop.mapred.Counters; -import org.apache.hadoop.mapreduce.*; -import org.apache.ignite.internal.processors.hadoop.counter.*; -import org.apache.ignite.internal.processors.hadoop.v2.*; - -import java.io.*; - -import static org.apache.hadoop.mapreduce.util.CountersStrings.*; - -/** - * Hadoop counter implementation for v1 API. - */ -public class GridHadoopV1Counter extends Counters.Counter { - /** Delegate. */ - private final GridHadoopLongCounter cntr; - - /** - * Creates new instance. - * - * @param cntr Delegate counter. - */ - public GridHadoopV1Counter(GridHadoopLongCounter cntr) { - this.cntr = cntr; - } - - /** {@inheritDoc} */ - @Override public void setDisplayName(String displayName) { - // No-op. - } - - /** {@inheritDoc} */ - @Override public String getName() { - return cntr.name(); - } - - /** {@inheritDoc} */ - @Override public String getDisplayName() { - return getName(); - } - - /** {@inheritDoc} */ - @Override public long getValue() { - return cntr.value(); - } - - /** {@inheritDoc} */ - @Override public void setValue(long val) { - cntr.value(val); - } - - /** {@inheritDoc} */ - @Override public void increment(long incr) { - cntr.increment(incr); - } - - /** {@inheritDoc} */ - @Override public void write(DataOutput out) throws IOException { - throw new UnsupportedOperationException("not implemented"); - } - - /** {@inheritDoc} */ - @Override public void readFields(DataInput in) throws IOException { - throw new UnsupportedOperationException("not implemented"); - } - - /** {@inheritDoc} */ - @Override public String makeEscapedCompactString() { - return toEscapedCompactString(new GridHadoopV2Counter(cntr)); - } - - /** {@inheritDoc} */ - @SuppressWarnings("deprecation") - @Override public boolean contentEquals(Counters.Counter cntr) { - return getUnderlyingCounter().equals(cntr.getUnderlyingCounter()); - } - - /** {@inheritDoc} */ - @Override public long getCounter() { - return cntr.value(); - } - - /** {@inheritDoc} */ - @Override public Counter getUnderlyingCounter() { - return this; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1MapTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1MapTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1MapTask.java deleted file mode 100644 index 878b61b..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1MapTask.java +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.v1; - -import org.apache.hadoop.fs.*; -import org.apache.hadoop.mapred.*; -import org.apache.hadoop.util.*; -import org.apache.ignite.*; -import org.apache.ignite.internal.processors.hadoop.*; -import org.apache.ignite.internal.processors.hadoop.v2.*; - -/** - * Hadoop map task implementation for v1 API. - */ -public class GridHadoopV1MapTask extends GridHadoopV1Task { - /** */ - private static final String[] EMPTY_HOSTS = new String[0]; - - /** - * Constructor. - * - * @param taskInfo - */ - public GridHadoopV1MapTask(GridHadoopTaskInfo taskInfo) { - super(taskInfo); - } - - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override public void run(GridHadoopTaskContext taskCtx) throws IgniteCheckedException { - GridHadoopJob job = taskCtx.job(); - - GridHadoopV2TaskContext ctx = (GridHadoopV2TaskContext)taskCtx; - - JobConf jobConf = ctx.jobConf(); - - InputFormat inFormat = jobConf.getInputFormat(); - - GridHadoopInputSplit split = info().inputSplit(); - - InputSplit nativeSplit; - - if (split instanceof GridHadoopFileBlock) { - GridHadoopFileBlock block = (GridHadoopFileBlock)split; - - nativeSplit = new FileSplit(new Path(block.file().toString()), block.start(), block.length(), EMPTY_HOSTS); - } - else - nativeSplit = (InputSplit)ctx.getNativeSplit(split); - - assert nativeSplit != null; - - Reporter reporter = new GridHadoopV1Reporter(taskCtx); - - GridHadoopV1OutputCollector collector = null; - - try { - collector = collector(jobConf, ctx, !job.info().hasCombiner() && !job.info().hasReducer(), - fileName(), ctx.attemptId()); - - RecordReader reader = inFormat.getRecordReader(nativeSplit, jobConf, reporter); - - Mapper mapper = ReflectionUtils.newInstance(jobConf.getMapperClass(), jobConf); - - Object key = reader.createKey(); - Object val = reader.createValue(); - - assert mapper != null; - - try { - try { - while (reader.next(key, val)) { - if (isCancelled()) - throw new GridHadoopTaskCancelledException("Map task cancelled."); - - mapper.map(key, val, collector, reporter); - } - } - finally { - mapper.close(); - } - } - finally { - collector.closeWriter(); - } - - collector.commit(); - } - catch (Exception e) { - if (collector != null) - collector.abort(); - - throw new IgniteCheckedException(e); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1OutputCollector.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1OutputCollector.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1OutputCollector.java deleted file mode 100644 index 2a38684..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1OutputCollector.java +++ /dev/null @@ -1,130 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.v1; - -import org.apache.hadoop.mapred.*; -import org.apache.ignite.*; -import org.apache.ignite.internal.processors.hadoop.*; -import org.jetbrains.annotations.*; - -import java.io.*; - -/** - * Hadoop output collector. - */ -public class GridHadoopV1OutputCollector implements OutputCollector { - /** Job configuration. */ - private final JobConf jobConf; - - /** Task context. */ - private final GridHadoopTaskContext taskCtx; - - /** Optional direct writer. */ - private final RecordWriter writer; - - /** Task attempt. */ - private final TaskAttemptID attempt; - - /** - * @param jobConf Job configuration. - * @param taskCtx Task context. - * @param directWrite Direct write flag. - * @param fileName File name. - * @throws IOException In case of IO exception. - */ - GridHadoopV1OutputCollector(JobConf jobConf, GridHadoopTaskContext taskCtx, boolean directWrite, - @Nullable String fileName, TaskAttemptID attempt) throws IOException { - this.jobConf = jobConf; - this.taskCtx = taskCtx; - this.attempt = attempt; - - if (directWrite) { - jobConf.set("mapreduce.task.attempt.id", attempt.toString()); - - OutputFormat outFormat = jobConf.getOutputFormat(); - - writer = outFormat.getRecordWriter(null, jobConf, fileName, Reporter.NULL); - } - else - writer = null; - } - - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override public void collect(Object key, Object val) throws IOException { - if (writer != null) - writer.write(key, val); - else { - try { - taskCtx.output().write(key, val); - } - catch (IgniteCheckedException e) { - throw new IOException(e); - } - } - } - - /** - * Close writer. - * - * @throws IOException In case of IO exception. - */ - public void closeWriter() throws IOException { - if (writer != null) - writer.close(Reporter.NULL); - } - - /** - * Setup task. - * - * @throws IOException If failed. - */ - public void setup() throws IOException { - if (writer != null) - jobConf.getOutputCommitter().setupTask(new TaskAttemptContextImpl(jobConf, attempt)); - } - - /** - * Commit task. - * - * @throws IOException In failed. - */ - public void commit() throws IOException { - if (writer != null) { - OutputCommitter outputCommitter = jobConf.getOutputCommitter(); - - TaskAttemptContext taskCtx = new TaskAttemptContextImpl(jobConf, attempt); - - if (outputCommitter.needsTaskCommit(taskCtx)) - outputCommitter.commitTask(taskCtx); - } - } - - /** - * Abort task. - */ - public void abort() { - try { - if (writer != null) - jobConf.getOutputCommitter().abortTask(new TaskAttemptContextImpl(jobConf, attempt)); - } - catch (IOException ignore) { - // No-op. - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Partitioner.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Partitioner.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Partitioner.java deleted file mode 100644 index 688ccef..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Partitioner.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.v1; - -import org.apache.hadoop.conf.*; -import org.apache.hadoop.mapred.*; -import org.apache.hadoop.util.*; -import org.apache.ignite.internal.processors.hadoop.*; - -/** - * Hadoop partitioner adapter for v1 API. - */ -public class GridHadoopV1Partitioner implements GridHadoopPartitioner { - /** Partitioner instance. */ - private Partitioner<Object, Object> part; - - /** - * @param cls Hadoop partitioner class. - * @param conf Job configuration. - */ - public GridHadoopV1Partitioner(Class<? extends Partitioner> cls, Configuration conf) { - part = (Partitioner<Object, Object>) ReflectionUtils.newInstance(cls, conf); - } - - /** {@inheritDoc} */ - @Override public int partition(Object key, Object val, int parts) { - return part.getPartition(key, val, parts); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1ReduceTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1ReduceTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1ReduceTask.java deleted file mode 100644 index 7deea90..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1ReduceTask.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.v1; - -import org.apache.hadoop.mapred.*; -import org.apache.hadoop.util.*; -import org.apache.ignite.*; -import org.apache.ignite.internal.processors.hadoop.*; -import org.apache.ignite.internal.processors.hadoop.v2.*; - -/** - * Hadoop reduce task implementation for v1 API. - */ -public class GridHadoopV1ReduceTask extends GridHadoopV1Task { - /** {@code True} if reduce, {@code false} if combine. */ - private final boolean reduce; - - /** - * Constructor. - * - * @param taskInfo Task info. - * @param reduce {@code True} if reduce, {@code false} if combine. - */ - public GridHadoopV1ReduceTask(GridHadoopTaskInfo taskInfo, boolean reduce) { - super(taskInfo); - - this.reduce = reduce; - } - - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override public void run(GridHadoopTaskContext taskCtx) throws IgniteCheckedException { - GridHadoopJob job = taskCtx.job(); - - GridHadoopV2TaskContext ctx = (GridHadoopV2TaskContext)taskCtx; - - JobConf jobConf = ctx.jobConf(); - - GridHadoopTaskInput input = taskCtx.input(); - - GridHadoopV1OutputCollector collector = null; - - try { - collector = collector(jobConf, ctx, reduce || !job.info().hasReducer(), fileName(), ctx.attemptId()); - - Reducer reducer = ReflectionUtils.newInstance(reduce ? jobConf.getReducerClass() : jobConf.getCombinerClass(), - jobConf); - - assert reducer != null; - - try { - try { - while (input.next()) { - if (isCancelled()) - throw new GridHadoopTaskCancelledException("Reduce task cancelled."); - - reducer.reduce(input.key(), input.values(), collector, Reporter.NULL); - } - } - finally { - reducer.close(); - } - } - finally { - collector.closeWriter(); - } - - collector.commit(); - } - catch (Exception e) { - if (collector != null) - collector.abort(); - - throw new IgniteCheckedException(e); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Reporter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Reporter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Reporter.java deleted file mode 100644 index 1abb2c0..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Reporter.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.v1; - -import org.apache.hadoop.mapred.*; -import org.apache.ignite.internal.processors.hadoop.*; -import org.apache.ignite.internal.processors.hadoop.counter.*; - -/** - * Hadoop reporter implementation for v1 API. - */ -public class GridHadoopV1Reporter implements Reporter { - /** Context. */ - private final GridHadoopTaskContext ctx; - - /** - * Creates new instance. - * - * @param ctx Context. - */ - public GridHadoopV1Reporter(GridHadoopTaskContext ctx) { - this.ctx = ctx; - } - - /** {@inheritDoc} */ - @Override public void setStatus(String status) { - // TODO - } - - /** {@inheritDoc} */ - @Override public Counters.Counter getCounter(Enum<?> name) { - return getCounter(name.getDeclaringClass().getName(), name.name()); - } - - /** {@inheritDoc} */ - @Override public Counters.Counter getCounter(String grp, String name) { - return new GridHadoopV1Counter(ctx.counter(grp, name, GridHadoopLongCounter.class)); - } - - /** {@inheritDoc} */ - @Override public void incrCounter(Enum<?> key, long amount) { - getCounter(key).increment(amount); - } - - /** {@inheritDoc} */ - @Override public void incrCounter(String grp, String cntr, long amount) { - getCounter(grp, cntr).increment(amount); - } - - /** {@inheritDoc} */ - @Override public InputSplit getInputSplit() throws UnsupportedOperationException { - throw new UnsupportedOperationException("reporter has no input"); // TODO - } - - /** {@inheritDoc} */ - @Override public float getProgress() { - return 0.5f; // TODO - } - - /** {@inheritDoc} */ - @Override public void progress() { - // TODO - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1SetupTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1SetupTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1SetupTask.java deleted file mode 100644 index c7dc3fd..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1SetupTask.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.v1; - -import org.apache.hadoop.mapred.*; -import org.apache.ignite.*; -import org.apache.ignite.internal.processors.hadoop.*; -import org.apache.ignite.internal.processors.hadoop.v2.*; - -import java.io.*; - -/** - * Hadoop setup task implementation for v1 API. - */ -public class GridHadoopV1SetupTask extends GridHadoopV1Task { - /** - * Constructor. - * - * @param taskInfo Task info. - */ - public GridHadoopV1SetupTask(GridHadoopTaskInfo taskInfo) { - super(taskInfo); - } - - /** {@inheritDoc} */ - @Override public void run(GridHadoopTaskContext taskCtx) throws IgniteCheckedException { - GridHadoopV2TaskContext ctx = (GridHadoopV2TaskContext)taskCtx; - - try { - ctx.jobConf().getOutputFormat().checkOutputSpecs(null, ctx.jobConf()); - - OutputCommitter committer = ctx.jobConf().getOutputCommitter(); - - if (committer != null) - committer.setupJob(ctx.jobContext()); - } - catch (IOException e) { - throw new IgniteCheckedException(e); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Splitter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Splitter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Splitter.java deleted file mode 100644 index 257f4ea..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Splitter.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.v1; - -import org.apache.hadoop.fs.*; -import org.apache.hadoop.mapred.*; -import org.apache.ignite.*; -import org.apache.ignite.internal.processors.hadoop.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.util.*; - -/** - * Hadoop API v1 splitter. - */ -public class GridHadoopV1Splitter { - /** */ - private static final String[] EMPTY_HOSTS = {}; - - /** - * @param jobConf Job configuration. - * @return Collection of mapped splits. - * @throws IgniteCheckedException If mapping failed. - */ - public static Collection<GridHadoopInputSplit> splitJob(JobConf jobConf) throws IgniteCheckedException { - try { - InputFormat<?, ?> format = jobConf.getInputFormat(); - - assert format != null; - - InputSplit[] splits = format.getSplits(jobConf, 0); - - Collection<GridHadoopInputSplit> res = new ArrayList<>(splits.length); - - for (int i = 0; i < splits.length; i++) { - InputSplit nativeSplit = splits[i]; - - if (nativeSplit instanceof FileSplit) { - FileSplit s = (FileSplit)nativeSplit; - - res.add(new GridHadoopFileBlock(s.getLocations(), s.getPath().toUri(), s.getStart(), s.getLength())); - } - else - res.add(GridHadoopUtils.wrapSplit(i, nativeSplit, nativeSplit.getLocations())); - } - - return res; - } - catch (IOException e) { - throw new IgniteCheckedException(e); - } - } - - /** - * @param clsName Input split class name. - * @param in Input stream. - * @param hosts Optional hosts. - * @return File block or {@code null} if it is not a {@link FileSplit} instance. - * @throws IgniteCheckedException If failed. - */ - @Nullable public static GridHadoopFileBlock readFileBlock(String clsName, FSDataInputStream in, - @Nullable String[] hosts) throws IgniteCheckedException { - if (!FileSplit.class.getName().equals(clsName)) - return null; - - FileSplit split = U.newInstance(FileSplit.class); - - try { - split.readFields(in); - } - catch (IOException e) { - throw new IgniteCheckedException(e); - } - - if (hosts == null) - hosts = EMPTY_HOSTS; - - return new GridHadoopFileBlock(hosts, split.getPath().toUri(), split.getStart(), split.getLength()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Task.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Task.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Task.java deleted file mode 100644 index 86a7264..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Task.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.v1; - -import org.apache.hadoop.mapred.*; -import org.apache.ignite.internal.processors.hadoop.*; -import org.apache.ignite.internal.processors.hadoop.v2.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.text.*; - -/** - * Extended Hadoop v1 task. - */ -public abstract class GridHadoopV1Task extends GridHadoopTask { - /** Indicates that this task is to be cancelled. */ - private volatile boolean cancelled; - - /** - * Constructor. - * - * @param taskInfo Task info. - */ - protected GridHadoopV1Task(GridHadoopTaskInfo taskInfo) { - super(taskInfo); - } - - /** - * Gets file name for that task result. - * - * @return File name. - */ - public String fileName() { - NumberFormat numFormat = NumberFormat.getInstance(); - - numFormat.setMinimumIntegerDigits(5); - numFormat.setGroupingUsed(false); - - return "part-" + numFormat.format(info().taskNumber()); - } - - /** - * - * @param jobConf Job configuration. - * @param taskCtx Task context. - * @param directWrite Direct write flag. - * @param fileName File name. - * @param attempt Attempt of task. - * @return Collector. - * @throws IOException In case of IO exception. - */ - protected GridHadoopV1OutputCollector collector(JobConf jobConf, GridHadoopV2TaskContext taskCtx, - boolean directWrite, @Nullable String fileName, TaskAttemptID attempt) throws IOException { - GridHadoopV1OutputCollector collector = new GridHadoopV1OutputCollector(jobConf, taskCtx, directWrite, - fileName, attempt) { - /** {@inheritDoc} */ - @Override public void collect(Object key, Object val) throws IOException { - if (cancelled) - throw new GridHadoopTaskCancelledException("Task cancelled."); - - super.collect(key, val); - } - }; - - collector.setup(); - - return collector; - } - - /** {@inheritDoc} */ - @Override public void cancel() { - cancelled = true; - } - - /** Returns true if task is cancelled. */ - public boolean isCancelled() { - return cancelled; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1CleanupTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1CleanupTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1CleanupTask.java new file mode 100644 index 0000000..fa570ea --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1CleanupTask.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.v1; + +import org.apache.hadoop.mapred.*; +import org.apache.ignite.*; +import org.apache.ignite.internal.processors.hadoop.*; +import org.apache.ignite.internal.processors.hadoop.v2.*; + +import java.io.*; + +/** + * Hadoop cleanup task implementation for v1 API. + */ +public class HadoopV1CleanupTask extends HadoopV1Task { + /** Abort flag. */ + private final boolean abort; + + /** + * @param taskInfo Task info. + * @param abort Abort flag. + */ + public HadoopV1CleanupTask(HadoopTaskInfo taskInfo, boolean abort) { + super(taskInfo); + + this.abort = abort; + } + + /** {@inheritDoc} */ + @Override public void run(HadoopTaskContext taskCtx) throws IgniteCheckedException { + HadoopV2TaskContext ctx = (HadoopV2TaskContext)taskCtx; + + JobContext jobCtx = ctx.jobContext(); + + try { + OutputCommitter committer = jobCtx.getJobConf().getOutputCommitter(); + + if (abort) + committer.abortJob(jobCtx, JobStatus.State.FAILED); + else + committer.commitJob(jobCtx); + } + catch (IOException e) { + throw new IgniteCheckedException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Counter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Counter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Counter.java new file mode 100644 index 0000000..609297b --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Counter.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.v1; + +import org.apache.hadoop.mapred.Counters; +import org.apache.hadoop.mapreduce.*; +import org.apache.ignite.internal.processors.hadoop.counter.*; +import org.apache.ignite.internal.processors.hadoop.v2.*; + +import java.io.*; + +import static org.apache.hadoop.mapreduce.util.CountersStrings.*; + +/** + * Hadoop counter implementation for v1 API. + */ +public class HadoopV1Counter extends Counters.Counter { + /** Delegate. */ + private final HadoopLongCounter cntr; + + /** + * Creates new instance. + * + * @param cntr Delegate counter. + */ + public HadoopV1Counter(HadoopLongCounter cntr) { + this.cntr = cntr; + } + + /** {@inheritDoc} */ + @Override public void setDisplayName(String displayName) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public String getName() { + return cntr.name(); + } + + /** {@inheritDoc} */ + @Override public String getDisplayName() { + return getName(); + } + + /** {@inheritDoc} */ + @Override public long getValue() { + return cntr.value(); + } + + /** {@inheritDoc} */ + @Override public void setValue(long val) { + cntr.value(val); + } + + /** {@inheritDoc} */ + @Override public void increment(long incr) { + cntr.increment(incr); + } + + /** {@inheritDoc} */ + @Override public void write(DataOutput out) throws IOException { + throw new UnsupportedOperationException("not implemented"); + } + + /** {@inheritDoc} */ + @Override public void readFields(DataInput in) throws IOException { + throw new UnsupportedOperationException("not implemented"); + } + + /** {@inheritDoc} */ + @Override public String makeEscapedCompactString() { + return toEscapedCompactString(new HadoopV2Counter(cntr)); + } + + /** {@inheritDoc} */ + @SuppressWarnings("deprecation") + @Override public boolean contentEquals(Counters.Counter cntr) { + return getUnderlyingCounter().equals(cntr.getUnderlyingCounter()); + } + + /** {@inheritDoc} */ + @Override public long getCounter() { + return cntr.value(); + } + + /** {@inheritDoc} */ + @Override public Counter getUnderlyingCounter() { + return this; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1MapTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1MapTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1MapTask.java new file mode 100644 index 0000000..ad7b058 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1MapTask.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.v1; + +import org.apache.hadoop.fs.*; +import org.apache.hadoop.mapred.*; +import org.apache.hadoop.util.*; +import org.apache.ignite.*; +import org.apache.ignite.internal.processors.hadoop.*; +import org.apache.ignite.internal.processors.hadoop.v2.*; + +/** + * Hadoop map task implementation for v1 API. + */ +public class HadoopV1MapTask extends HadoopV1Task { + /** */ + private static final String[] EMPTY_HOSTS = new String[0]; + + /** + * Constructor. + * + * @param taskInfo + */ + public HadoopV1MapTask(HadoopTaskInfo taskInfo) { + super(taskInfo); + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public void run(HadoopTaskContext taskCtx) throws IgniteCheckedException { + HadoopJob job = taskCtx.job(); + + HadoopV2TaskContext ctx = (HadoopV2TaskContext)taskCtx; + + JobConf jobConf = ctx.jobConf(); + + InputFormat inFormat = jobConf.getInputFormat(); + + HadoopInputSplit split = info().inputSplit(); + + InputSplit nativeSplit; + + if (split instanceof HadoopFileBlock) { + HadoopFileBlock block = (HadoopFileBlock)split; + + nativeSplit = new FileSplit(new Path(block.file().toString()), block.start(), block.length(), EMPTY_HOSTS); + } + else + nativeSplit = (InputSplit)ctx.getNativeSplit(split); + + assert nativeSplit != null; + + Reporter reporter = new HadoopV1Reporter(taskCtx); + + HadoopV1OutputCollector collector = null; + + try { + collector = collector(jobConf, ctx, !job.info().hasCombiner() && !job.info().hasReducer(), + fileName(), ctx.attemptId()); + + RecordReader reader = inFormat.getRecordReader(nativeSplit, jobConf, reporter); + + Mapper mapper = ReflectionUtils.newInstance(jobConf.getMapperClass(), jobConf); + + Object key = reader.createKey(); + Object val = reader.createValue(); + + assert mapper != null; + + try { + try { + while (reader.next(key, val)) { + if (isCancelled()) + throw new HadoopTaskCancelledException("Map task cancelled."); + + mapper.map(key, val, collector, reporter); + } + } + finally { + mapper.close(); + } + } + finally { + collector.closeWriter(); + } + + collector.commit(); + } + catch (Exception e) { + if (collector != null) + collector.abort(); + + throw new IgniteCheckedException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1OutputCollector.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1OutputCollector.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1OutputCollector.java new file mode 100644 index 0000000..348274d --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1OutputCollector.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.v1; + +import org.apache.hadoop.mapred.*; +import org.apache.ignite.*; +import org.apache.ignite.internal.processors.hadoop.*; +import org.jetbrains.annotations.*; + +import java.io.*; + +/** + * Hadoop output collector. + */ +public class HadoopV1OutputCollector implements OutputCollector { + /** Job configuration. */ + private final JobConf jobConf; + + /** Task context. */ + private final HadoopTaskContext taskCtx; + + /** Optional direct writer. */ + private final RecordWriter writer; + + /** Task attempt. */ + private final TaskAttemptID attempt; + + /** + * @param jobConf Job configuration. + * @param taskCtx Task context. + * @param directWrite Direct write flag. + * @param fileName File name. + * @throws IOException In case of IO exception. + */ + HadoopV1OutputCollector(JobConf jobConf, HadoopTaskContext taskCtx, boolean directWrite, + @Nullable String fileName, TaskAttemptID attempt) throws IOException { + this.jobConf = jobConf; + this.taskCtx = taskCtx; + this.attempt = attempt; + + if (directWrite) { + jobConf.set("mapreduce.task.attempt.id", attempt.toString()); + + OutputFormat outFormat = jobConf.getOutputFormat(); + + writer = outFormat.getRecordWriter(null, jobConf, fileName, Reporter.NULL); + } + else + writer = null; + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public void collect(Object key, Object val) throws IOException { + if (writer != null) + writer.write(key, val); + else { + try { + taskCtx.output().write(key, val); + } + catch (IgniteCheckedException e) { + throw new IOException(e); + } + } + } + + /** + * Close writer. + * + * @throws IOException In case of IO exception. + */ + public void closeWriter() throws IOException { + if (writer != null) + writer.close(Reporter.NULL); + } + + /** + * Setup task. + * + * @throws IOException If failed. + */ + public void setup() throws IOException { + if (writer != null) + jobConf.getOutputCommitter().setupTask(new TaskAttemptContextImpl(jobConf, attempt)); + } + + /** + * Commit task. + * + * @throws IOException In failed. + */ + public void commit() throws IOException { + if (writer != null) { + OutputCommitter outputCommitter = jobConf.getOutputCommitter(); + + TaskAttemptContext taskCtx = new TaskAttemptContextImpl(jobConf, attempt); + + if (outputCommitter.needsTaskCommit(taskCtx)) + outputCommitter.commitTask(taskCtx); + } + } + + /** + * Abort task. + */ + public void abort() { + try { + if (writer != null) + jobConf.getOutputCommitter().abortTask(new TaskAttemptContextImpl(jobConf, attempt)); + } + catch (IOException ignore) { + // No-op. + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Partitioner.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Partitioner.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Partitioner.java new file mode 100644 index 0000000..e45f92b --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Partitioner.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.v1; + +import org.apache.hadoop.conf.*; +import org.apache.hadoop.mapred.*; +import org.apache.hadoop.util.*; +import org.apache.ignite.internal.processors.hadoop.*; + +/** + * Hadoop partitioner adapter for v1 API. + */ +public class HadoopV1Partitioner implements HadoopPartitioner { + /** Partitioner instance. */ + private Partitioner<Object, Object> part; + + /** + * @param cls Hadoop partitioner class. + * @param conf Job configuration. + */ + public HadoopV1Partitioner(Class<? extends Partitioner> cls, Configuration conf) { + part = (Partitioner<Object, Object>) ReflectionUtils.newInstance(cls, conf); + } + + /** {@inheritDoc} */ + @Override public int partition(Object key, Object val, int parts) { + return part.getPartition(key, val, parts); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1ReduceTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1ReduceTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1ReduceTask.java new file mode 100644 index 0000000..18ee09d --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1ReduceTask.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.v1; + +import org.apache.hadoop.mapred.*; +import org.apache.hadoop.util.*; +import org.apache.ignite.*; +import org.apache.ignite.internal.processors.hadoop.*; +import org.apache.ignite.internal.processors.hadoop.v2.*; + +/** + * Hadoop reduce task implementation for v1 API. + */ +public class HadoopV1ReduceTask extends HadoopV1Task { + /** {@code True} if reduce, {@code false} if combine. */ + private final boolean reduce; + + /** + * Constructor. + * + * @param taskInfo Task info. + * @param reduce {@code True} if reduce, {@code false} if combine. + */ + public HadoopV1ReduceTask(HadoopTaskInfo taskInfo, boolean reduce) { + super(taskInfo); + + this.reduce = reduce; + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public void run(HadoopTaskContext taskCtx) throws IgniteCheckedException { + HadoopJob job = taskCtx.job(); + + HadoopV2TaskContext ctx = (HadoopV2TaskContext)taskCtx; + + JobConf jobConf = ctx.jobConf(); + + HadoopTaskInput input = taskCtx.input(); + + HadoopV1OutputCollector collector = null; + + try { + collector = collector(jobConf, ctx, reduce || !job.info().hasReducer(), fileName(), ctx.attemptId()); + + Reducer reducer = ReflectionUtils.newInstance(reduce ? jobConf.getReducerClass() : jobConf.getCombinerClass(), + jobConf); + + assert reducer != null; + + try { + try { + while (input.next()) { + if (isCancelled()) + throw new HadoopTaskCancelledException("Reduce task cancelled."); + + reducer.reduce(input.key(), input.values(), collector, Reporter.NULL); + } + } + finally { + reducer.close(); + } + } + finally { + collector.closeWriter(); + } + + collector.commit(); + } + catch (Exception e) { + if (collector != null) + collector.abort(); + + throw new IgniteCheckedException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Reporter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Reporter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Reporter.java new file mode 100644 index 0000000..d799373 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Reporter.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.v1; + +import org.apache.hadoop.mapred.*; +import org.apache.ignite.internal.processors.hadoop.*; +import org.apache.ignite.internal.processors.hadoop.counter.*; + +/** + * Hadoop reporter implementation for v1 API. + */ +public class HadoopV1Reporter implements Reporter { + /** Context. */ + private final HadoopTaskContext ctx; + + /** + * Creates new instance. + * + * @param ctx Context. + */ + public HadoopV1Reporter(HadoopTaskContext ctx) { + this.ctx = ctx; + } + + /** {@inheritDoc} */ + @Override public void setStatus(String status) { + // TODO + } + + /** {@inheritDoc} */ + @Override public Counters.Counter getCounter(Enum<?> name) { + return getCounter(name.getDeclaringClass().getName(), name.name()); + } + + /** {@inheritDoc} */ + @Override public Counters.Counter getCounter(String grp, String name) { + return new HadoopV1Counter(ctx.counter(grp, name, HadoopLongCounter.class)); + } + + /** {@inheritDoc} */ + @Override public void incrCounter(Enum<?> key, long amount) { + getCounter(key).increment(amount); + } + + /** {@inheritDoc} */ + @Override public void incrCounter(String grp, String cntr, long amount) { + getCounter(grp, cntr).increment(amount); + } + + /** {@inheritDoc} */ + @Override public InputSplit getInputSplit() throws UnsupportedOperationException { + throw new UnsupportedOperationException("reporter has no input"); // TODO + } + + /** {@inheritDoc} */ + @Override public float getProgress() { + return 0.5f; // TODO + } + + /** {@inheritDoc} */ + @Override public void progress() { + // TODO + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1SetupTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1SetupTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1SetupTask.java new file mode 100644 index 0000000..a758f1d --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1SetupTask.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.v1; + +import org.apache.hadoop.mapred.*; +import org.apache.ignite.*; +import org.apache.ignite.internal.processors.hadoop.*; +import org.apache.ignite.internal.processors.hadoop.v2.*; + +import java.io.*; + +/** + * Hadoop setup task implementation for v1 API. + */ +public class HadoopV1SetupTask extends HadoopV1Task { + /** + * Constructor. + * + * @param taskInfo Task info. + */ + public HadoopV1SetupTask(HadoopTaskInfo taskInfo) { + super(taskInfo); + } + + /** {@inheritDoc} */ + @Override public void run(HadoopTaskContext taskCtx) throws IgniteCheckedException { + HadoopV2TaskContext ctx = (HadoopV2TaskContext)taskCtx; + + try { + ctx.jobConf().getOutputFormat().checkOutputSpecs(null, ctx.jobConf()); + + OutputCommitter committer = ctx.jobConf().getOutputCommitter(); + + if (committer != null) + committer.setupJob(ctx.jobContext()); + } + catch (IOException e) { + throw new IgniteCheckedException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Splitter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Splitter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Splitter.java new file mode 100644 index 0000000..9eebbb8 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Splitter.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.v1; + +import org.apache.hadoop.fs.*; +import org.apache.hadoop.mapred.*; +import org.apache.ignite.*; +import org.apache.ignite.internal.processors.hadoop.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.util.*; + +/** + * Hadoop API v1 splitter. + */ +public class HadoopV1Splitter { + /** */ + private static final String[] EMPTY_HOSTS = {}; + + /** + * @param jobConf Job configuration. + * @return Collection of mapped splits. + * @throws IgniteCheckedException If mapping failed. + */ + public static Collection<HadoopInputSplit> splitJob(JobConf jobConf) throws IgniteCheckedException { + try { + InputFormat<?, ?> format = jobConf.getInputFormat(); + + assert format != null; + + InputSplit[] splits = format.getSplits(jobConf, 0); + + Collection<HadoopInputSplit> res = new ArrayList<>(splits.length); + + for (int i = 0; i < splits.length; i++) { + InputSplit nativeSplit = splits[i]; + + if (nativeSplit instanceof FileSplit) { + FileSplit s = (FileSplit)nativeSplit; + + res.add(new HadoopFileBlock(s.getLocations(), s.getPath().toUri(), s.getStart(), s.getLength())); + } + else + res.add(HadoopUtils.wrapSplit(i, nativeSplit, nativeSplit.getLocations())); + } + + return res; + } + catch (IOException e) { + throw new IgniteCheckedException(e); + } + } + + /** + * @param clsName Input split class name. + * @param in Input stream. + * @param hosts Optional hosts. + * @return File block or {@code null} if it is not a {@link FileSplit} instance. + * @throws IgniteCheckedException If failed. + */ + @Nullable public static HadoopFileBlock readFileBlock(String clsName, FSDataInputStream in, + @Nullable String[] hosts) throws IgniteCheckedException { + if (!FileSplit.class.getName().equals(clsName)) + return null; + + FileSplit split = U.newInstance(FileSplit.class); + + try { + split.readFields(in); + } + catch (IOException e) { + throw new IgniteCheckedException(e); + } + + if (hosts == null) + hosts = EMPTY_HOSTS; + + return new HadoopFileBlock(hosts, split.getPath().toUri(), split.getStart(), split.getLength()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Task.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Task.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Task.java new file mode 100644 index 0000000..b7da700 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Task.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.v1; + +import org.apache.hadoop.mapred.*; +import org.apache.ignite.internal.processors.hadoop.*; +import org.apache.ignite.internal.processors.hadoop.v2.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.text.*; + +/** + * Extended Hadoop v1 task. + */ +public abstract class HadoopV1Task extends HadoopTask { + /** Indicates that this task is to be cancelled. */ + private volatile boolean cancelled; + + /** + * Constructor. + * + * @param taskInfo Task info. + */ + protected HadoopV1Task(HadoopTaskInfo taskInfo) { + super(taskInfo); + } + + /** + * Gets file name for that task result. + * + * @return File name. + */ + public String fileName() { + NumberFormat numFormat = NumberFormat.getInstance(); + + numFormat.setMinimumIntegerDigits(5); + numFormat.setGroupingUsed(false); + + return "part-" + numFormat.format(info().taskNumber()); + } + + /** + * + * @param jobConf Job configuration. + * @param taskCtx Task context. + * @param directWrite Direct write flag. + * @param fileName File name. + * @param attempt Attempt of task. + * @return Collector. + * @throws IOException In case of IO exception. + */ + protected HadoopV1OutputCollector collector(JobConf jobConf, HadoopV2TaskContext taskCtx, + boolean directWrite, @Nullable String fileName, TaskAttemptID attempt) throws IOException { + HadoopV1OutputCollector collector = new HadoopV1OutputCollector(jobConf, taskCtx, directWrite, + fileName, attempt) { + /** {@inheritDoc} */ + @Override public void collect(Object key, Object val) throws IOException { + if (cancelled) + throw new HadoopTaskCancelledException("Task cancelled."); + + super.collect(key, val); + } + }; + + collector.setup(); + + return collector; + } + + /** {@inheritDoc} */ + @Override public void cancel() { + cancelled = true; + } + + /** Returns true if task is cancelled. */ + public boolean isCancelled() { + return cancelled; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopExternalSplit.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopExternalSplit.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopExternalSplit.java deleted file mode 100644 index 36b40a2..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopExternalSplit.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.v2; - -import org.apache.ignite.internal.processors.hadoop.*; - -import java.io.*; - -/** - * Split serialized in external file. - */ -public class GridHadoopExternalSplit extends GridHadoopInputSplit { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - private long off; - - /** - * For {@link Externalizable}. - */ - public GridHadoopExternalSplit() { - // No-op. - } - - /** - * @param hosts Hosts. - * @param off Offset of this split in external file. - */ - public GridHadoopExternalSplit(String[] hosts, long off) { - assert off >= 0 : off; - assert hosts != null; - - this.hosts = hosts; - this.off = off; - } - - /** - * @return Offset of this input split in external file. - */ - public long offset() { - return off; - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeLong(off); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - off = in.readLong(); - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object o) { - if (this == o) - return true; - - if (o == null || getClass() != o.getClass()) - return false; - - GridHadoopExternalSplit that = (GridHadoopExternalSplit) o; - - return off == that.off; - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - return (int)(off ^ (off >>> 32)); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopNativeCodeLoader.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopNativeCodeLoader.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopNativeCodeLoader.java deleted file mode 100644 index 5ef4759..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopNativeCodeLoader.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.v2; - -import org.apache.hadoop.classification.*; -import org.apache.hadoop.conf.*; - -/** - * A fake helper to load the native hadoop code i.e. libhadoop.so. - */ -@InterfaceAudience.Private -@InterfaceStability.Unstable -public class GridHadoopNativeCodeLoader { - /** - * Check if native-hadoop code is loaded for this platform. - * - * @return <code>true</code> if native-hadoop is loaded, - * else <code>false</code> - */ - public static boolean isNativeCodeLoaded() { - return false; - } - - /** - * Returns true only if this build was compiled with support for snappy. - */ - public static boolean buildSupportsSnappy() { - return false; - } - - /** - * @return Library name. - */ - public static String getLibraryName() { - throw new IllegalStateException(); - } - - /** - * Return if native hadoop libraries, if present, can be used for this job. - * @param conf configuration - * - * @return <code>true</code> if native hadoop libraries, if present, can be - * used for this job; <code>false</code> otherwise. - */ - public boolean getLoadNativeLibraries(Configuration conf) { - return false; - } - - /** - * Set if native hadoop libraries, if present, can be used for this job. - * - * @param conf configuration - * @param loadNativeLibraries can native hadoop libraries be loaded - */ - public void setLoadNativeLibraries(Configuration conf, boolean loadNativeLibraries) { - // No-op. - } -} - http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopSerializationWrapper.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopSerializationWrapper.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopSerializationWrapper.java deleted file mode 100644 index 0f38548..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopSerializationWrapper.java +++ /dev/null @@ -1,133 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.v2; - -import org.apache.hadoop.io.serializer.*; -import org.apache.ignite.*; -import org.apache.ignite.internal.processors.hadoop.*; -import org.jetbrains.annotations.*; - -import java.io.*; - -/** - * The wrapper around external serializer. - */ -public class GridHadoopSerializationWrapper<T> implements GridHadoopSerialization { - /** External serializer - writer. */ - private final Serializer<T> serializer; - - /** External serializer - reader. */ - private final Deserializer<T> deserializer; - - /** Data output for current write operation. */ - private OutputStream currOut; - - /** Data input for current read operation. */ - private InputStream currIn; - - /** Wrapper around current output to provide OutputStream interface. */ - private final OutputStream outStream = new OutputStream() { - /** {@inheritDoc} */ - @Override public void write(int b) throws IOException { - currOut.write(b); - } - - /** {@inheritDoc} */ - @Override public void write(byte[] b, int off, int len) throws IOException { - currOut.write(b, off, len); - } - }; - - /** Wrapper around current input to provide InputStream interface. */ - private final InputStream inStream = new InputStream() { - /** {@inheritDoc} */ - @Override public int read() throws IOException { - return currIn.read(); - } - - /** {@inheritDoc} */ - @Override public int read(byte[] b, int off, int len) throws IOException { - return currIn.read(b, off, len); - } - }; - - /** - * @param serialization External serializer to wrap. - * @param cls The class to serialize. - */ - public GridHadoopSerializationWrapper(Serialization<T> serialization, Class<T> cls) throws IgniteCheckedException { - assert cls != null; - - serializer = serialization.getSerializer(cls); - deserializer = serialization.getDeserializer(cls); - - try { - serializer.open(outStream); - deserializer.open(inStream); - } - catch (IOException e) { - throw new IgniteCheckedException(e); - } - } - - /** {@inheritDoc} */ - @Override public void write(DataOutput out, Object obj) throws IgniteCheckedException { - assert out != null; - assert obj != null; - - try { - currOut = (OutputStream)out; - - serializer.serialize((T)obj); - - currOut = null; - } - catch (IOException e) { - throw new IgniteCheckedException(e); - } - } - - /** {@inheritDoc} */ - @Override public Object read(DataInput in, @Nullable Object obj) throws IgniteCheckedException { - assert in != null; - - try { - currIn = (InputStream)in; - - T res = deserializer.deserialize((T) obj); - - currIn = null; - - return res; - } - catch (IOException e) { - throw new IgniteCheckedException(e); - } - } - - /** {@inheritDoc} */ - @Override public void close() throws IgniteCheckedException { - try { - serializer.close(); - deserializer.close(); - } - catch (IOException e) { - throw new IgniteCheckedException(e); - } - } -}