http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java new file mode 100644 index 0000000..24f10a6 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java @@ -0,0 +1,444 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.v2; + +import org.apache.hadoop.conf.*; +import org.apache.hadoop.fs.*; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.io.*; +import org.apache.hadoop.io.serializer.*; +import org.apache.hadoop.mapred.*; +import org.apache.hadoop.mapred.JobID; +import org.apache.hadoop.mapred.TaskAttemptID; +import org.apache.hadoop.mapred.TaskID; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.*; +import org.apache.ignite.*; +import org.apache.ignite.internal.processors.hadoop.*; +import org.apache.ignite.internal.processors.hadoop.counter.*; +import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters; +import org.apache.ignite.internal.processors.hadoop.fs.*; +import org.apache.ignite.internal.processors.hadoop.v1.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.util.*; + +import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.*; +import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*; + +/** + * Context for task execution. + */ +public class HadoopV2TaskContext extends HadoopTaskContext { + /** */ + private static final boolean COMBINE_KEY_GROUPING_SUPPORTED; + + /** + * Check for combiner grouping support (available since Hadoop 2.3). + */ + static { + boolean ok; + + try { + JobContext.class.getDeclaredMethod("getCombinerKeyGroupingComparator"); + + ok = true; + } + catch (NoSuchMethodException ignore) { + ok = false; + } + + COMBINE_KEY_GROUPING_SUPPORTED = ok; + } + + /** Flag is set if new context-object code is used for running the mapper. */ + private final boolean useNewMapper; + + /** Flag is set if new context-object code is used for running the reducer. */ + private final boolean useNewReducer; + + /** Flag is set if new context-object code is used for running the combiner. */ + private final boolean useNewCombiner; + + /** */ + private final JobContextImpl jobCtx; + + /** Set if task is to cancelling. */ + private volatile boolean cancelled; + + /** Current task. */ + private volatile HadoopTask task; + + /** Local node ID */ + private UUID locNodeId; + + /** Counters for task. */ + private final HadoopCounters cntrs = new HadoopCountersImpl(); + + /** + * @param taskInfo Task info. + * @param job Job. + * @param jobId Job ID. + * @param locNodeId Local node ID. + * @param jobConfDataInput DataInput for read JobConf. + */ + public HadoopV2TaskContext(HadoopTaskInfo taskInfo, HadoopJob job, HadoopJobId jobId, + @Nullable UUID locNodeId, DataInput jobConfDataInput) throws IgniteCheckedException { + super(taskInfo, job); + this.locNodeId = locNodeId; + + // Before create JobConf instance we should set new context class loader. + Thread.currentThread().setContextClassLoader(getClass().getClassLoader()); + + try { + JobConf jobConf = new JobConf(); + + try { + jobConf.readFields(jobConfDataInput); + } + catch (IOException e) { + throw new IgniteCheckedException(e); + } + + // For map-reduce jobs prefer local writes. + jobConf.setBooleanIfUnset(PARAM_IGFS_PREFER_LOCAL_WRITES, true); + + jobCtx = new JobContextImpl(jobConf, new JobID(jobId.globalId().toString(), jobId.localId())); + + useNewMapper = jobConf.getUseNewMapper(); + useNewReducer = jobConf.getUseNewReducer(); + useNewCombiner = jobConf.getCombinerClass() == null; + } + finally { + Thread.currentThread().setContextClassLoader(null); + } + } + + /** {@inheritDoc} */ + @Override public <T extends HadoopCounter> T counter(String grp, String name, Class<T> cls) { + return cntrs.counter(grp, name, cls); + } + + /** {@inheritDoc} */ + @Override public HadoopCounters counters() { + return cntrs; + } + + /** + * Creates appropriate task from current task info. + * + * @return Task. + */ + private HadoopTask createTask() { + boolean isAbort = taskInfo().type() == HadoopTaskType.ABORT; + + switch (taskInfo().type()) { + case SETUP: + return useNewMapper ? new HadoopV2SetupTask(taskInfo()) : new HadoopV1SetupTask(taskInfo()); + + case MAP: + return useNewMapper ? new HadoopV2MapTask(taskInfo()) : new HadoopV1MapTask(taskInfo()); + + case REDUCE: + return useNewReducer ? new HadoopV2ReduceTask(taskInfo(), true) : + new HadoopV1ReduceTask(taskInfo(), true); + + case COMBINE: + return useNewCombiner ? new HadoopV2ReduceTask(taskInfo(), false) : + new HadoopV1ReduceTask(taskInfo(), false); + + case COMMIT: + case ABORT: + return useNewReducer ? new HadoopV2CleanupTask(taskInfo(), isAbort) : + new HadoopV1CleanupTask(taskInfo(), isAbort); + + default: + return null; + } + } + + /** {@inheritDoc} */ + @Override public void run() throws IgniteCheckedException { + try { + Thread.currentThread().setContextClassLoader(jobConf().getClassLoader()); + + try { + task = createTask(); + } + catch (Throwable e) { + throw transformException(e); + } + + if (cancelled) + throw new HadoopTaskCancelledException("Task cancelled."); + + try { + task.run(this); + } + catch (Throwable e) { + throw transformException(e); + } + } + finally { + task = null; + + Thread.currentThread().setContextClassLoader(null); + } + } + + /** {@inheritDoc} */ + @Override public void cancel() { + cancelled = true; + + HadoopTask t = task; + + if (t != null) + t.cancel(); + } + + /** {@inheritDoc} */ + @Override public void prepareTaskEnvironment() throws IgniteCheckedException { + File locDir; + + switch(taskInfo().type()) { + case MAP: + case REDUCE: + job().prepareTaskEnvironment(taskInfo()); + + locDir = taskLocalDir(locNodeId, taskInfo()); + + break; + + default: + locDir = jobLocalDir(locNodeId, taskInfo().jobId()); + } + + Thread.currentThread().setContextClassLoader(jobConf().getClassLoader()); + + try { + FileSystem fs = FileSystem.get(jobConf()); + + HadoopFileSystemsUtils.setUser(fs, jobConf().getUser()); + + LocalFileSystem locFs = FileSystem.getLocal(jobConf()); + + locFs.setWorkingDirectory(new Path(locDir.getAbsolutePath())); + } + catch (Throwable e) { + throw transformException(e); + } + finally { + Thread.currentThread().setContextClassLoader(null); + } + } + + /** {@inheritDoc} */ + @Override public void cleanupTaskEnvironment() throws IgniteCheckedException { + job().cleanupTaskEnvironment(taskInfo()); + } + + /** + * Creates Hadoop attempt ID. + * + * @return Attempt ID. + */ + public TaskAttemptID attemptId() { + TaskID tid = new TaskID(jobCtx.getJobID(), taskType(taskInfo().type()), taskInfo().taskNumber()); + + return new TaskAttemptID(tid, taskInfo().attempt()); + } + + /** + * @param type Task type. + * @return Hadoop task type. + */ + private TaskType taskType(HadoopTaskType type) { + switch (type) { + case SETUP: + return TaskType.JOB_SETUP; + case MAP: + case COMBINE: + return TaskType.MAP; + + case REDUCE: + return TaskType.REDUCE; + + case COMMIT: + case ABORT: + return TaskType.JOB_CLEANUP; + + default: + return null; + } + } + + /** + * Gets job configuration of the task. + * + * @return Job configuration. + */ + public JobConf jobConf() { + return jobCtx.getJobConf(); + } + + /** + * Gets job context of the task. + * + * @return Job context. + */ + public JobContextImpl jobContext() { + return jobCtx; + } + + /** {@inheritDoc} */ + @Override public HadoopPartitioner partitioner() throws IgniteCheckedException { + Class<?> partClsOld = jobConf().getClass("mapred.partitioner.class", null); + + if (partClsOld != null) + return new HadoopV1Partitioner(jobConf().getPartitionerClass(), jobConf()); + + try { + return new HadoopV2Partitioner(jobCtx.getPartitionerClass(), jobConf()); + } + catch (ClassNotFoundException e) { + throw new IgniteCheckedException(e); + } + } + + /** + * Gets serializer for specified class. + * + * @param cls Class. + * @param jobConf Job configuration. + * @return Appropriate serializer. + */ + @SuppressWarnings("unchecked") + private HadoopSerialization getSerialization(Class<?> cls, Configuration jobConf) throws IgniteCheckedException { + A.notNull(cls, "cls"); + + SerializationFactory factory = new SerializationFactory(jobConf); + + Serialization<?> serialization = factory.getSerialization(cls); + + if (serialization == null) + throw new IgniteCheckedException("Failed to find serialization for: " + cls.getName()); + + if (serialization.getClass() == WritableSerialization.class) + return new HadoopWritableSerialization((Class<? extends Writable>)cls); + + return new HadoopSerializationWrapper(serialization, cls); + } + + /** {@inheritDoc} */ + @Override public HadoopSerialization keySerialization() throws IgniteCheckedException { + return getSerialization(jobCtx.getMapOutputKeyClass(), jobConf()); + } + + /** {@inheritDoc} */ + @Override public HadoopSerialization valueSerialization() throws IgniteCheckedException { + return getSerialization(jobCtx.getMapOutputValueClass(), jobConf()); + } + + /** {@inheritDoc} */ + @Override public Comparator<Object> sortComparator() { + return (Comparator<Object>)jobCtx.getSortComparator(); + } + + /** {@inheritDoc} */ + @Override public Comparator<Object> groupComparator() { + Comparator<?> res; + + switch (taskInfo().type()) { + case COMBINE: + res = COMBINE_KEY_GROUPING_SUPPORTED ? + jobContext().getCombinerKeyGroupingComparator() : jobContext().getGroupingComparator(); + + break; + + case REDUCE: + res = jobContext().getGroupingComparator(); + + break; + + default: + return null; + } + + if (res != null && res.getClass() != sortComparator().getClass()) + return (Comparator<Object>)res; + + return null; + } + + /** + * @param split Split. + * @return Native Hadoop split. + * @throws IgniteCheckedException if failed. + */ + @SuppressWarnings("unchecked") + public Object getNativeSplit(HadoopInputSplit split) throws IgniteCheckedException { + if (split instanceof HadoopExternalSplit) + return readExternalSplit((HadoopExternalSplit)split); + + if (split instanceof HadoopSplitWrapper) + return unwrapSplit((HadoopSplitWrapper)split); + + throw new IllegalStateException("Unknown split: " + split); + } + + /** + * @param split External split. + * @return Native input split. + * @throws IgniteCheckedException If failed. + */ + @SuppressWarnings("unchecked") + private Object readExternalSplit(HadoopExternalSplit split) throws IgniteCheckedException { + Path jobDir = new Path(jobConf().get(MRJobConfig.MAPREDUCE_JOB_DIR)); + + try (FileSystem fs = FileSystem.get(jobDir.toUri(), jobConf()); + FSDataInputStream in = fs.open(JobSubmissionFiles.getJobSplitFile(jobDir))) { + + in.seek(split.offset()); + + String clsName = Text.readString(in); + + Class<?> cls = jobConf().getClassByName(clsName); + + assert cls != null; + + Serialization serialization = new SerializationFactory(jobConf()).getSerialization(cls); + + Deserializer deserializer = serialization.getDeserializer(cls); + + deserializer.open(in); + + Object res = deserializer.deserialize(null); + + deserializer.close(); + + assert res != null; + + return res; + } + catch (IOException | ClassNotFoundException e) { + throw new IgniteCheckedException(e); + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopWritableSerialization.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopWritableSerialization.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopWritableSerialization.java new file mode 100644 index 0000000..3920dd5 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopWritableSerialization.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.v2; + +import org.apache.hadoop.io.*; +import org.apache.ignite.*; +import org.apache.ignite.internal.processors.hadoop.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +import java.io.*; + +/** + * Optimized serialization for Hadoop {@link Writable} types. + */ +public class HadoopWritableSerialization implements HadoopSerialization { + /** */ + private final Class<? extends Writable> cls; + + /** + * @param cls Class. + */ + public HadoopWritableSerialization(Class<? extends Writable> cls) { + assert cls != null; + + this.cls = cls; + } + + /** {@inheritDoc} */ + @Override public void write(DataOutput out, Object obj) throws IgniteCheckedException { + assert cls.isAssignableFrom(obj.getClass()) : cls + " " + obj.getClass(); + + try { + ((Writable)obj).write(out); + } + catch (IOException e) { + throw new IgniteCheckedException(e); + } + } + + /** {@inheritDoc} */ + @Override public Object read(DataInput in, @Nullable Object obj) throws IgniteCheckedException { + Writable w = obj == null ? U.newInstance(cls) : cls.cast(obj); + + try { + w.readFields(in); + } + catch (IOException e) { + throw new IgniteCheckedException(e); + } + + return w; + } + + /** {@inheritDoc} */ + @Override public void close() { + // No-op. + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/resources/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/resources/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider b/modules/hadoop/src/main/resources/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider index fe35d5e..8d5957b 100644 --- a/modules/hadoop/src/main/resources/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider +++ b/modules/hadoop/src/main/resources/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider @@ -1 +1 @@ -org.apache.ignite.client.hadoop.GridHadoopClientProtocolProvider +org.apache.ignite.hadoop.mapreduce.IgniteHadoopClientProtocolProvider http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/GridHadoopClientProtocolEmbeddedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/GridHadoopClientProtocolEmbeddedSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/GridHadoopClientProtocolEmbeddedSelfTest.java deleted file mode 100644 index 780ce67..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/GridHadoopClientProtocolEmbeddedSelfTest.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.client.hadoop; - -import org.apache.ignite.internal.processors.hadoop.*; - -/** - * Hadoop client protocol tests in embedded process mode. - */ -public class GridHadoopClientProtocolEmbeddedSelfTest extends GridHadoopClientProtocolSelfTest { - /** {@inheritDoc} */ - @Override public GridHadoopConfiguration hadoopConfiguration(String gridName) { - GridHadoopConfiguration cfg = super.hadoopConfiguration(gridName); - - cfg.setExternalExecution(false); - - return cfg; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/GridHadoopClientProtocolSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/GridHadoopClientProtocolSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/GridHadoopClientProtocolSelfTest.java deleted file mode 100644 index ff8798b..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/GridHadoopClientProtocolSelfTest.java +++ /dev/null @@ -1,633 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.client.hadoop; - -import org.apache.hadoop.conf.*; -import org.apache.hadoop.fs.*; -import org.apache.hadoop.io.*; -import org.apache.hadoop.mapreduce.*; -import org.apache.hadoop.mapreduce.lib.input.*; -import org.apache.hadoop.mapreduce.lib.output.*; -import org.apache.hadoop.mapreduce.protocol.*; -import org.apache.ignite.*; -import org.apache.ignite.igfs.*; -import org.apache.ignite.internal.processors.hadoop.*; -import org.apache.ignite.internal.util.lang.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.testframework.*; - -import java.io.*; -import java.util.*; - -/** - * Hadoop client protocol tests in external process mode. - */ -@SuppressWarnings("ResultOfMethodCallIgnored") -public class GridHadoopClientProtocolSelfTest extends GridHadoopAbstractSelfTest { - /** Input path. */ - private static final String PATH_INPUT = "/input"; - - /** Output path. */ - private static final String PATH_OUTPUT = "/output"; - - /** Job name. */ - private static final String JOB_NAME = "myJob"; - - /** Setup lock file. */ - private static File setupLockFile = new File(U.isWindows() ? System.getProperty("java.io.tmpdir") : "/tmp", - "ignite-lock-setup.file"); - - /** Map lock file. */ - private static File mapLockFile = new File(U.isWindows() ? System.getProperty("java.io.tmpdir") : "/tmp", - "ignite-lock-map.file"); - - /** Reduce lock file. */ - private static File reduceLockFile = new File(U.isWindows() ? System.getProperty("java.io.tmpdir") : "/tmp", - "ignite-lock-reduce.file"); - - /** {@inheritDoc} */ - @Override protected int gridCount() { - return 2; - } - - /** {@inheritDoc} */ - @Override protected boolean igfsEnabled() { - return true; - } - - /** {@inheritDoc} */ - @Override protected boolean restEnabled() { - return true; - } - - /** {@inheritDoc} */ - @Override protected void beforeTestsStarted() throws Exception { - super.beforeTestsStarted(); - - startGrids(gridCount()); - - setupLockFile.delete(); - mapLockFile.delete(); - reduceLockFile.delete(); - } - - /** {@inheritDoc} */ - @Override protected void afterTestsStopped() throws Exception { - stopAllGrids(); - - super.afterTestsStopped(); - -// GridHadoopClientProtocolProvider.cliMap.clear(); - } - - /** {@inheritDoc} */ - @Override protected void beforeTest() throws Exception { - setupLockFile.createNewFile(); - mapLockFile.createNewFile(); - reduceLockFile.createNewFile(); - - setupLockFile.deleteOnExit(); - mapLockFile.deleteOnExit(); - reduceLockFile.deleteOnExit(); - - super.beforeTest(); - } - - /** {@inheritDoc} */ - @Override protected void afterTest() throws Exception { - grid(0).fileSystem(GridHadoopAbstractSelfTest.igfsName).format(); - - setupLockFile.delete(); - mapLockFile.delete(); - reduceLockFile.delete(); - - super.afterTest(); - } - - /** - * Test next job ID generation. - * - * @throws Exception If failed. - */ - @SuppressWarnings("ConstantConditions") - private void tstNextJobId() throws Exception { - GridHadoopClientProtocolProvider provider = provider(); - - ClientProtocol proto = provider.create(config(GridHadoopAbstractSelfTest.REST_PORT)); - - JobID jobId = proto.getNewJobID(); - - assert jobId != null; - assert jobId.getJtIdentifier() != null; - - JobID nextJobId = proto.getNewJobID(); - - assert nextJobId != null; - assert nextJobId.getJtIdentifier() != null; - - assert !F.eq(jobId, nextJobId); - } - - /** - * Tests job counters retrieval. - * - * @throws Exception If failed. - */ - public void testJobCounters() throws Exception { - IgniteFs igfs = grid(0).fileSystem(GridHadoopAbstractSelfTest.igfsName); - - igfs.mkdirs(new IgfsPath(PATH_INPUT)); - - try (BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(igfs.create( - new IgfsPath(PATH_INPUT + "/test.file"), true)))) { - - bw.write( - "alpha\n" + - "beta\n" + - "gamma\n" + - "alpha\n" + - "beta\n" + - "gamma\n" + - "alpha\n" + - "beta\n" + - "gamma\n" - ); - } - - Configuration conf = config(GridHadoopAbstractSelfTest.REST_PORT); - - final Job job = Job.getInstance(conf); - - job.setOutputKeyClass(Text.class); - job.setOutputValueClass(IntWritable.class); - - job.setMapperClass(TestCountingMapper.class); - job.setReducerClass(TestCountingReducer.class); - job.setCombinerClass(TestCountingCombiner.class); - - FileInputFormat.setInputPaths(job, new Path(PATH_INPUT)); - FileOutputFormat.setOutputPath(job, new Path(PATH_OUTPUT)); - - job.submit(); - - final Counter cntr = job.getCounters().findCounter(TestCounter.COUNTER1); - - assertEquals(0, cntr.getValue()); - - cntr.increment(10); - - assertEquals(10, cntr.getValue()); - - // Transferring to map phase. - setupLockFile.delete(); - - // Transferring to reduce phase. - mapLockFile.delete(); - - job.waitForCompletion(false); - - assertEquals("job must end successfully", JobStatus.State.SUCCEEDED, job.getStatus().getState()); - - final Counters counters = job.getCounters(); - - assertNotNull("counters cannot be null", counters); - assertEquals("wrong counters count", 3, counters.countCounters()); - assertEquals("wrong counter value", 15, counters.findCounter(TestCounter.COUNTER1).getValue()); - assertEquals("wrong counter value", 3, counters.findCounter(TestCounter.COUNTER2).getValue()); - assertEquals("wrong counter value", 3, counters.findCounter(TestCounter.COUNTER3).getValue()); - } - - /** - * Tests job counters retrieval for unknown job id. - * - * @throws Exception If failed. - */ - private void tstUnknownJobCounters() throws Exception { - GridHadoopClientProtocolProvider provider = provider(); - - ClientProtocol proto = provider.create(config(GridHadoopAbstractSelfTest.REST_PORT)); - - try { - proto.getJobCounters(new JobID(UUID.randomUUID().toString(), -1)); - fail("exception must be thrown"); - } - catch (Exception e) { - assert e instanceof IOException : "wrong error has been thrown"; - } - } - - /** - * @throws Exception If failed. - */ - private void tstJobSubmitMap() throws Exception { - checkJobSubmit(true, true); - } - - /** - * @throws Exception If failed. - */ - private void tstJobSubmitMapCombine() throws Exception { - checkJobSubmit(false, true); - } - - /** - * @throws Exception If failed. - */ - private void tstJobSubmitMapReduce() throws Exception { - checkJobSubmit(true, false); - } - - /** - * @throws Exception If failed. - */ - private void tstJobSubmitMapCombineReduce() throws Exception { - checkJobSubmit(false, false); - } - - /** - * Test job submission. - * - * @param noCombiners Whether there are no combiners. - * @param noReducers Whether there are no reducers. - * @throws Exception If failed. - */ - public void checkJobSubmit(boolean noCombiners, boolean noReducers) throws Exception { - IgniteFs igfs = grid(0).fileSystem(GridHadoopAbstractSelfTest.igfsName); - - igfs.mkdirs(new IgfsPath(PATH_INPUT)); - - try (BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(igfs.create( - new IgfsPath(PATH_INPUT + "/test.file"), true)))) { - - bw.write("word"); - } - - Configuration conf = config(GridHadoopAbstractSelfTest.REST_PORT); - - final Job job = Job.getInstance(conf); - - job.setJobName(JOB_NAME); - - job.setOutputKeyClass(Text.class); - job.setOutputValueClass(IntWritable.class); - - job.setMapperClass(TestMapper.class); - job.setReducerClass(TestReducer.class); - - if (!noCombiners) - job.setCombinerClass(TestCombiner.class); - - if (noReducers) - job.setNumReduceTasks(0); - - job.setInputFormatClass(TextInputFormat.class); - job.setOutputFormatClass(TestOutputFormat.class); - - FileInputFormat.setInputPaths(job, new Path(PATH_INPUT)); - FileOutputFormat.setOutputPath(job, new Path(PATH_OUTPUT)); - - job.submit(); - - JobID jobId = job.getJobID(); - - // Setup phase. - JobStatus jobStatus = job.getStatus(); - checkJobStatus(jobStatus, jobId, JOB_NAME, JobStatus.State.RUNNING, 0.0f); - assert jobStatus.getSetupProgress() >= 0.0f && jobStatus.getSetupProgress() < 1.0f; - assert jobStatus.getMapProgress() == 0.0f; - assert jobStatus.getReduceProgress() == 0.0f; - - U.sleep(2100); - - JobStatus recentJobStatus = job.getStatus(); - - assert recentJobStatus.getSetupProgress() > jobStatus.getSetupProgress() : - "Old=" + jobStatus.getSetupProgress() + ", new=" + recentJobStatus.getSetupProgress(); - - // Transferring to map phase. - setupLockFile.delete(); - - assert GridTestUtils.waitForCondition(new GridAbsPredicate() { - @Override public boolean apply() { - try { - return F.eq(1.0f, job.getStatus().getSetupProgress()); - } - catch (Exception e) { - throw new RuntimeException("Unexpected exception.", e); - } - } - }, 5000L); - - // Map phase. - jobStatus = job.getStatus(); - checkJobStatus(jobStatus, jobId, JOB_NAME, JobStatus.State.RUNNING, 0.0f); - assert jobStatus.getSetupProgress() == 1.0f; - assert jobStatus.getMapProgress() >= 0.0f && jobStatus.getMapProgress() < 1.0f; - assert jobStatus.getReduceProgress() == 0.0f; - - U.sleep(2100); - - recentJobStatus = job.getStatus(); - - assert recentJobStatus.getMapProgress() > jobStatus.getMapProgress() : - "Old=" + jobStatus.getMapProgress() + ", new=" + recentJobStatus.getMapProgress(); - - // Transferring to reduce phase. - mapLockFile.delete(); - - assert GridTestUtils.waitForCondition(new GridAbsPredicate() { - @Override public boolean apply() { - try { - return F.eq(1.0f, job.getStatus().getMapProgress()); - } - catch (Exception e) { - throw new RuntimeException("Unexpected exception.", e); - } - } - }, 5000L); - - if (!noReducers) { - // Reduce phase. - jobStatus = job.getStatus(); - checkJobStatus(jobStatus, jobId, JOB_NAME, JobStatus.State.RUNNING, 0.0f); - assert jobStatus.getSetupProgress() == 1.0f; - assert jobStatus.getMapProgress() == 1.0f; - assert jobStatus.getReduceProgress() >= 0.0f && jobStatus.getReduceProgress() < 1.0f; - - // Ensure that reduces progress increases. - U.sleep(2100); - - recentJobStatus = job.getStatus(); - - assert recentJobStatus.getReduceProgress() > jobStatus.getReduceProgress() : - "Old=" + jobStatus.getReduceProgress() + ", new=" + recentJobStatus.getReduceProgress(); - - reduceLockFile.delete(); - } - - job.waitForCompletion(false); - - jobStatus = job.getStatus(); - checkJobStatus(job.getStatus(), jobId, JOB_NAME, JobStatus.State.SUCCEEDED, 1.0f); - assert jobStatus.getSetupProgress() == 1.0f; - assert jobStatus.getMapProgress() == 1.0f; - assert jobStatus.getReduceProgress() == 1.0f; - - dumpIgfs(igfs, new IgfsPath(PATH_OUTPUT)); - } - - /** - * Dump IGFS content. - * - * @param igfs IGFS. - * @param path Path. - * @throws Exception If failed. - */ - @SuppressWarnings("ConstantConditions") - private static void dumpIgfs(IgniteFs igfs, IgfsPath path) throws Exception { - IgfsFile file = igfs.info(path); - - assert file != null; - - System.out.println(file.path()); - - if (file.isDirectory()) { - for (IgfsPath child : igfs.listPaths(path)) - dumpIgfs(igfs, child); - } - else { - try (BufferedReader br = new BufferedReader(new InputStreamReader(igfs.open(path)))) { - String line = br.readLine(); - - while (line != null) { - System.out.println(line); - - line = br.readLine(); - } - } - } - } - - /** - * Check job status. - * - * @param status Job status. - * @param expJobId Expected job ID. - * @param expJobName Expected job name. - * @param expState Expected state. - * @param expCleanupProgress Expected cleanup progress. - * @throws Exception If failed. - */ - private static void checkJobStatus(JobStatus status, JobID expJobId, String expJobName, - JobStatus.State expState, float expCleanupProgress) throws Exception { - assert F.eq(status.getJobID(), expJobId) : "Expected=" + expJobId + ", actual=" + status.getJobID(); - assert F.eq(status.getJobName(), expJobName) : "Expected=" + expJobName + ", actual=" + status.getJobName(); - assert F.eq(status.getState(), expState) : "Expected=" + expState + ", actual=" + status.getState(); - assert F.eq(status.getCleanupProgress(), expCleanupProgress) : - "Expected=" + expCleanupProgress + ", actual=" + status.getCleanupProgress(); - } - - /** - * @return Configuration. - */ - private Configuration config(int port) { - Configuration conf = new Configuration(); - - setupFileSystems(conf); - - conf.set(MRConfig.FRAMEWORK_NAME, GridHadoopClientProtocol.FRAMEWORK_NAME); - conf.set(MRConfig.MASTER_ADDRESS, "127.0.0.1:" + port); - - conf.set("fs.defaultFS", "igfs://:" + getTestGridName(0) + "@/"); - - return conf; - } - - /** - * @return Protocol provider. - */ - private GridHadoopClientProtocolProvider provider() { - return new GridHadoopClientProtocolProvider(); - } - - /** - * Test mapper. - */ - public static class TestMapper extends Mapper<Object, Text, Text, IntWritable> { - /** Writable container for writing word. */ - private Text word = new Text(); - - /** Writable integer constant of '1' is writing as count of found words. */ - private static final IntWritable one = new IntWritable(1); - - /** {@inheritDoc} */ - @Override public void map(Object key, Text val, Context ctx) throws IOException, InterruptedException { - while (mapLockFile.exists()) - Thread.sleep(50); - - StringTokenizer wordList = new StringTokenizer(val.toString()); - - while (wordList.hasMoreTokens()) { - word.set(wordList.nextToken()); - - ctx.write(word, one); - } - } - } - - /** - * Test Hadoop counters. - */ - public enum TestCounter { - COUNTER1, COUNTER2, COUNTER3 - } - - /** - * Test mapper that uses counters. - */ - public static class TestCountingMapper extends TestMapper { - /** {@inheritDoc} */ - @Override public void map(Object key, Text val, Context ctx) throws IOException, InterruptedException { - super.map(key, val, ctx); - ctx.getCounter(TestCounter.COUNTER1).increment(1); - } - } - - /** - * Test combiner that counts invocations. - */ - public static class TestCountingCombiner extends TestReducer { - @Override public void reduce(Text key, Iterable<IntWritable> values, - Context ctx) throws IOException, InterruptedException { - ctx.getCounter(TestCounter.COUNTER1).increment(1); - ctx.getCounter(TestCounter.COUNTER2).increment(1); - - int sum = 0; - for (IntWritable value : values) { - sum += value.get(); - } - - ctx.write(key, new IntWritable(sum)); - } - } - - /** - * Test reducer that counts invocations. - */ - public static class TestCountingReducer extends TestReducer { - @Override public void reduce(Text key, Iterable<IntWritable> values, - Context ctx) throws IOException, InterruptedException { - ctx.getCounter(TestCounter.COUNTER1).increment(1); - ctx.getCounter(TestCounter.COUNTER3).increment(1); - } - } - - /** - * Test combiner. - */ - public static class TestCombiner extends Reducer<Text, IntWritable, Text, IntWritable> { - // No-op. - } - - public static class TestOutputFormat<K, V> extends TextOutputFormat<K, V> { - /** {@inheritDoc} */ - @Override public synchronized OutputCommitter getOutputCommitter(TaskAttemptContext ctx) - throws IOException { - return new TestOutputCommitter(ctx, (FileOutputCommitter)super.getOutputCommitter(ctx)); - } - } - - /** - * Test output committer. - */ - private static class TestOutputCommitter extends FileOutputCommitter { - /** Delegate. */ - private final FileOutputCommitter delegate; - - /** - * Constructor. - * - * @param ctx Task attempt context. - * @param delegate Delegate. - * @throws IOException If failed. - */ - private TestOutputCommitter(TaskAttemptContext ctx, FileOutputCommitter delegate) throws IOException { - super(FileOutputFormat.getOutputPath(ctx), ctx); - - this.delegate = delegate; - } - - /** {@inheritDoc} */ - @Override public void setupJob(JobContext jobCtx) throws IOException { - try { - while (setupLockFile.exists()) - Thread.sleep(50); - } - catch (InterruptedException ignored) { - throw new IOException("Interrupted."); - } - - delegate.setupJob(jobCtx); - } - - /** {@inheritDoc} */ - @Override public void setupTask(TaskAttemptContext taskCtx) throws IOException { - delegate.setupTask(taskCtx); - } - - /** {@inheritDoc} */ - @Override public boolean needsTaskCommit(TaskAttemptContext taskCtx) throws IOException { - return delegate.needsTaskCommit(taskCtx); - } - - /** {@inheritDoc} */ - @Override public void commitTask(TaskAttemptContext taskCtx) throws IOException { - delegate.commitTask(taskCtx); - } - - /** {@inheritDoc} */ - @Override public void abortTask(TaskAttemptContext taskCtx) throws IOException { - delegate.abortTask(taskCtx); - } - } - - /** - * Test reducer. - */ - public static class TestReducer extends Reducer<Text, IntWritable, Text, IntWritable> { - /** Writable container for writing sum of word counts. */ - private IntWritable totalWordCnt = new IntWritable(); - - /** {@inheritDoc} */ - @Override public void reduce(Text key, Iterable<IntWritable> values, Context ctx) throws IOException, - InterruptedException { - while (reduceLockFile.exists()) - Thread.sleep(50); - - int wordCnt = 0; - - for (IntWritable value : values) - wordCnt += value.get(); - - totalWordCnt.set(wordCnt); - - ctx.write(key, totalWordCnt); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolEmbeddedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolEmbeddedSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolEmbeddedSelfTest.java new file mode 100644 index 0000000..ffa20d1 --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolEmbeddedSelfTest.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.client.hadoop; + +import org.apache.ignite.configuration.*; + +/** + * Hadoop client protocol tests in embedded process mode. + */ +public class HadoopClientProtocolEmbeddedSelfTest extends HadoopClientProtocolSelfTest { + /** {@inheritDoc} */ + @Override public HadoopConfiguration hadoopConfiguration(String gridName) { + HadoopConfiguration cfg = super.hadoopConfiguration(gridName); + + cfg.setExternalExecution(false); + + return cfg; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolSelfTest.java new file mode 100644 index 0000000..d19a8ea --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolSelfTest.java @@ -0,0 +1,635 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.client.hadoop; + +import org.apache.hadoop.conf.*; +import org.apache.hadoop.fs.*; +import org.apache.hadoop.io.*; +import org.apache.hadoop.mapreduce.*; +import org.apache.hadoop.mapreduce.lib.input.*; +import org.apache.hadoop.mapreduce.lib.output.*; +import org.apache.hadoop.mapreduce.protocol.*; +import org.apache.ignite.*; +import org.apache.ignite.hadoop.mapreduce.*; +import org.apache.ignite.igfs.*; +import org.apache.ignite.internal.processors.hadoop.*; +import org.apache.ignite.internal.processors.hadoop.proto.*; +import org.apache.ignite.internal.util.lang.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.testframework.*; + +import java.io.*; +import java.util.*; + +/** + * Hadoop client protocol tests in external process mode. + */ +@SuppressWarnings("ResultOfMethodCallIgnored") +public class HadoopClientProtocolSelfTest extends HadoopAbstractSelfTest { + /** Input path. */ + private static final String PATH_INPUT = "/input"; + + /** Output path. */ + private static final String PATH_OUTPUT = "/output"; + + /** Job name. */ + private static final String JOB_NAME = "myJob"; + + /** Setup lock file. */ + private static File setupLockFile = new File(U.isWindows() ? System.getProperty("java.io.tmpdir") : "/tmp", + "ignite-lock-setup.file"); + + /** Map lock file. */ + private static File mapLockFile = new File(U.isWindows() ? System.getProperty("java.io.tmpdir") : "/tmp", + "ignite-lock-map.file"); + + /** Reduce lock file. */ + private static File reduceLockFile = new File(U.isWindows() ? System.getProperty("java.io.tmpdir") : "/tmp", + "ignite-lock-reduce.file"); + + /** {@inheritDoc} */ + @Override protected int gridCount() { + return 2; + } + + /** {@inheritDoc} */ + @Override protected boolean igfsEnabled() { + return true; + } + + /** {@inheritDoc} */ + @Override protected boolean restEnabled() { + return true; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + startGrids(gridCount()); + + setupLockFile.delete(); + mapLockFile.delete(); + reduceLockFile.delete(); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + + super.afterTestsStopped(); + +// IgniteHadoopClientProtocolProvider.cliMap.clear(); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + setupLockFile.createNewFile(); + mapLockFile.createNewFile(); + reduceLockFile.createNewFile(); + + setupLockFile.deleteOnExit(); + mapLockFile.deleteOnExit(); + reduceLockFile.deleteOnExit(); + + super.beforeTest(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + grid(0).fileSystem(HadoopAbstractSelfTest.igfsName).format(); + + setupLockFile.delete(); + mapLockFile.delete(); + reduceLockFile.delete(); + + super.afterTest(); + } + + /** + * Test next job ID generation. + * + * @throws Exception If failed. + */ + @SuppressWarnings("ConstantConditions") + private void tstNextJobId() throws Exception { + IgniteHadoopClientProtocolProvider provider = provider(); + + ClientProtocol proto = provider.create(config(HadoopAbstractSelfTest.REST_PORT)); + + JobID jobId = proto.getNewJobID(); + + assert jobId != null; + assert jobId.getJtIdentifier() != null; + + JobID nextJobId = proto.getNewJobID(); + + assert nextJobId != null; + assert nextJobId.getJtIdentifier() != null; + + assert !F.eq(jobId, nextJobId); + } + + /** + * Tests job counters retrieval. + * + * @throws Exception If failed. + */ + public void testJobCounters() throws Exception { + IgniteFileSystem igfs = grid(0).fileSystem(HadoopAbstractSelfTest.igfsName); + + igfs.mkdirs(new IgfsPath(PATH_INPUT)); + + try (BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(igfs.create( + new IgfsPath(PATH_INPUT + "/test.file"), true)))) { + + bw.write( + "alpha\n" + + "beta\n" + + "gamma\n" + + "alpha\n" + + "beta\n" + + "gamma\n" + + "alpha\n" + + "beta\n" + + "gamma\n" + ); + } + + Configuration conf = config(HadoopAbstractSelfTest.REST_PORT); + + final Job job = Job.getInstance(conf); + + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(IntWritable.class); + + job.setMapperClass(TestCountingMapper.class); + job.setReducerClass(TestCountingReducer.class); + job.setCombinerClass(TestCountingCombiner.class); + + FileInputFormat.setInputPaths(job, new Path(PATH_INPUT)); + FileOutputFormat.setOutputPath(job, new Path(PATH_OUTPUT)); + + job.submit(); + + final Counter cntr = job.getCounters().findCounter(TestCounter.COUNTER1); + + assertEquals(0, cntr.getValue()); + + cntr.increment(10); + + assertEquals(10, cntr.getValue()); + + // Transferring to map phase. + setupLockFile.delete(); + + // Transferring to reduce phase. + mapLockFile.delete(); + + job.waitForCompletion(false); + + assertEquals("job must end successfully", JobStatus.State.SUCCEEDED, job.getStatus().getState()); + + final Counters counters = job.getCounters(); + + assertNotNull("counters cannot be null", counters); + assertEquals("wrong counters count", 3, counters.countCounters()); + assertEquals("wrong counter value", 15, counters.findCounter(TestCounter.COUNTER1).getValue()); + assertEquals("wrong counter value", 3, counters.findCounter(TestCounter.COUNTER2).getValue()); + assertEquals("wrong counter value", 3, counters.findCounter(TestCounter.COUNTER3).getValue()); + } + + /** + * Tests job counters retrieval for unknown job id. + * + * @throws Exception If failed. + */ + private void tstUnknownJobCounters() throws Exception { + IgniteHadoopClientProtocolProvider provider = provider(); + + ClientProtocol proto = provider.create(config(HadoopAbstractSelfTest.REST_PORT)); + + try { + proto.getJobCounters(new JobID(UUID.randomUUID().toString(), -1)); + fail("exception must be thrown"); + } + catch (Exception e) { + assert e instanceof IOException : "wrong error has been thrown"; + } + } + + /** + * @throws Exception If failed. + */ + private void tstJobSubmitMap() throws Exception { + checkJobSubmit(true, true); + } + + /** + * @throws Exception If failed. + */ + private void tstJobSubmitMapCombine() throws Exception { + checkJobSubmit(false, true); + } + + /** + * @throws Exception If failed. + */ + private void tstJobSubmitMapReduce() throws Exception { + checkJobSubmit(true, false); + } + + /** + * @throws Exception If failed. + */ + private void tstJobSubmitMapCombineReduce() throws Exception { + checkJobSubmit(false, false); + } + + /** + * Test job submission. + * + * @param noCombiners Whether there are no combiners. + * @param noReducers Whether there are no reducers. + * @throws Exception If failed. + */ + public void checkJobSubmit(boolean noCombiners, boolean noReducers) throws Exception { + IgniteFileSystem igfs = grid(0).fileSystem(HadoopAbstractSelfTest.igfsName); + + igfs.mkdirs(new IgfsPath(PATH_INPUT)); + + try (BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(igfs.create( + new IgfsPath(PATH_INPUT + "/test.file"), true)))) { + + bw.write("word"); + } + + Configuration conf = config(HadoopAbstractSelfTest.REST_PORT); + + final Job job = Job.getInstance(conf); + + job.setJobName(JOB_NAME); + + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(IntWritable.class); + + job.setMapperClass(TestMapper.class); + job.setReducerClass(TestReducer.class); + + if (!noCombiners) + job.setCombinerClass(TestCombiner.class); + + if (noReducers) + job.setNumReduceTasks(0); + + job.setInputFormatClass(TextInputFormat.class); + job.setOutputFormatClass(TestOutputFormat.class); + + FileInputFormat.setInputPaths(job, new Path(PATH_INPUT)); + FileOutputFormat.setOutputPath(job, new Path(PATH_OUTPUT)); + + job.submit(); + + JobID jobId = job.getJobID(); + + // Setup phase. + JobStatus jobStatus = job.getStatus(); + checkJobStatus(jobStatus, jobId, JOB_NAME, JobStatus.State.RUNNING, 0.0f); + assert jobStatus.getSetupProgress() >= 0.0f && jobStatus.getSetupProgress() < 1.0f; + assert jobStatus.getMapProgress() == 0.0f; + assert jobStatus.getReduceProgress() == 0.0f; + + U.sleep(2100); + + JobStatus recentJobStatus = job.getStatus(); + + assert recentJobStatus.getSetupProgress() > jobStatus.getSetupProgress() : + "Old=" + jobStatus.getSetupProgress() + ", new=" + recentJobStatus.getSetupProgress(); + + // Transferring to map phase. + setupLockFile.delete(); + + assert GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + try { + return F.eq(1.0f, job.getStatus().getSetupProgress()); + } + catch (Exception e) { + throw new RuntimeException("Unexpected exception.", e); + } + } + }, 5000L); + + // Map phase. + jobStatus = job.getStatus(); + checkJobStatus(jobStatus, jobId, JOB_NAME, JobStatus.State.RUNNING, 0.0f); + assert jobStatus.getSetupProgress() == 1.0f; + assert jobStatus.getMapProgress() >= 0.0f && jobStatus.getMapProgress() < 1.0f; + assert jobStatus.getReduceProgress() == 0.0f; + + U.sleep(2100); + + recentJobStatus = job.getStatus(); + + assert recentJobStatus.getMapProgress() > jobStatus.getMapProgress() : + "Old=" + jobStatus.getMapProgress() + ", new=" + recentJobStatus.getMapProgress(); + + // Transferring to reduce phase. + mapLockFile.delete(); + + assert GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + try { + return F.eq(1.0f, job.getStatus().getMapProgress()); + } + catch (Exception e) { + throw new RuntimeException("Unexpected exception.", e); + } + } + }, 5000L); + + if (!noReducers) { + // Reduce phase. + jobStatus = job.getStatus(); + checkJobStatus(jobStatus, jobId, JOB_NAME, JobStatus.State.RUNNING, 0.0f); + assert jobStatus.getSetupProgress() == 1.0f; + assert jobStatus.getMapProgress() == 1.0f; + assert jobStatus.getReduceProgress() >= 0.0f && jobStatus.getReduceProgress() < 1.0f; + + // Ensure that reduces progress increases. + U.sleep(2100); + + recentJobStatus = job.getStatus(); + + assert recentJobStatus.getReduceProgress() > jobStatus.getReduceProgress() : + "Old=" + jobStatus.getReduceProgress() + ", new=" + recentJobStatus.getReduceProgress(); + + reduceLockFile.delete(); + } + + job.waitForCompletion(false); + + jobStatus = job.getStatus(); + checkJobStatus(job.getStatus(), jobId, JOB_NAME, JobStatus.State.SUCCEEDED, 1.0f); + assert jobStatus.getSetupProgress() == 1.0f; + assert jobStatus.getMapProgress() == 1.0f; + assert jobStatus.getReduceProgress() == 1.0f; + + dumpIgfs(igfs, new IgfsPath(PATH_OUTPUT)); + } + + /** + * Dump IGFS content. + * + * @param igfs IGFS. + * @param path Path. + * @throws Exception If failed. + */ + @SuppressWarnings("ConstantConditions") + private static void dumpIgfs(IgniteFileSystem igfs, IgfsPath path) throws Exception { + IgfsFile file = igfs.info(path); + + assert file != null; + + System.out.println(file.path()); + + if (file.isDirectory()) { + for (IgfsPath child : igfs.listPaths(path)) + dumpIgfs(igfs, child); + } + else { + try (BufferedReader br = new BufferedReader(new InputStreamReader(igfs.open(path)))) { + String line = br.readLine(); + + while (line != null) { + System.out.println(line); + + line = br.readLine(); + } + } + } + } + + /** + * Check job status. + * + * @param status Job status. + * @param expJobId Expected job ID. + * @param expJobName Expected job name. + * @param expState Expected state. + * @param expCleanupProgress Expected cleanup progress. + * @throws Exception If failed. + */ + private static void checkJobStatus(JobStatus status, JobID expJobId, String expJobName, + JobStatus.State expState, float expCleanupProgress) throws Exception { + assert F.eq(status.getJobID(), expJobId) : "Expected=" + expJobId + ", actual=" + status.getJobID(); + assert F.eq(status.getJobName(), expJobName) : "Expected=" + expJobName + ", actual=" + status.getJobName(); + assert F.eq(status.getState(), expState) : "Expected=" + expState + ", actual=" + status.getState(); + assert F.eq(status.getCleanupProgress(), expCleanupProgress) : + "Expected=" + expCleanupProgress + ", actual=" + status.getCleanupProgress(); + } + + /** + * @return Configuration. + */ + private Configuration config(int port) { + Configuration conf = new Configuration(); + + setupFileSystems(conf); + + conf.set(MRConfig.FRAMEWORK_NAME, HadoopClientProtocol.FRAMEWORK_NAME); + conf.set(MRConfig.MASTER_ADDRESS, "127.0.0.1:" + port); + + conf.set("fs.defaultFS", "igfs://:" + getTestGridName(0) + "@/"); + + return conf; + } + + /** + * @return Protocol provider. + */ + private IgniteHadoopClientProtocolProvider provider() { + return new IgniteHadoopClientProtocolProvider(); + } + + /** + * Test mapper. + */ + public static class TestMapper extends Mapper<Object, Text, Text, IntWritable> { + /** Writable container for writing word. */ + private Text word = new Text(); + + /** Writable integer constant of '1' is writing as count of found words. */ + private static final IntWritable one = new IntWritable(1); + + /** {@inheritDoc} */ + @Override public void map(Object key, Text val, Context ctx) throws IOException, InterruptedException { + while (mapLockFile.exists()) + Thread.sleep(50); + + StringTokenizer wordList = new StringTokenizer(val.toString()); + + while (wordList.hasMoreTokens()) { + word.set(wordList.nextToken()); + + ctx.write(word, one); + } + } + } + + /** + * Test Hadoop counters. + */ + public enum TestCounter { + COUNTER1, COUNTER2, COUNTER3 + } + + /** + * Test mapper that uses counters. + */ + public static class TestCountingMapper extends TestMapper { + /** {@inheritDoc} */ + @Override public void map(Object key, Text val, Context ctx) throws IOException, InterruptedException { + super.map(key, val, ctx); + ctx.getCounter(TestCounter.COUNTER1).increment(1); + } + } + + /** + * Test combiner that counts invocations. + */ + public static class TestCountingCombiner extends TestReducer { + @Override public void reduce(Text key, Iterable<IntWritable> values, + Context ctx) throws IOException, InterruptedException { + ctx.getCounter(TestCounter.COUNTER1).increment(1); + ctx.getCounter(TestCounter.COUNTER2).increment(1); + + int sum = 0; + for (IntWritable value : values) { + sum += value.get(); + } + + ctx.write(key, new IntWritable(sum)); + } + } + + /** + * Test reducer that counts invocations. + */ + public static class TestCountingReducer extends TestReducer { + @Override public void reduce(Text key, Iterable<IntWritable> values, + Context ctx) throws IOException, InterruptedException { + ctx.getCounter(TestCounter.COUNTER1).increment(1); + ctx.getCounter(TestCounter.COUNTER3).increment(1); + } + } + + /** + * Test combiner. + */ + public static class TestCombiner extends Reducer<Text, IntWritable, Text, IntWritable> { + // No-op. + } + + public static class TestOutputFormat<K, V> extends TextOutputFormat<K, V> { + /** {@inheritDoc} */ + @Override public synchronized OutputCommitter getOutputCommitter(TaskAttemptContext ctx) + throws IOException { + return new TestOutputCommitter(ctx, (FileOutputCommitter)super.getOutputCommitter(ctx)); + } + } + + /** + * Test output committer. + */ + private static class TestOutputCommitter extends FileOutputCommitter { + /** Delegate. */ + private final FileOutputCommitter delegate; + + /** + * Constructor. + * + * @param ctx Task attempt context. + * @param delegate Delegate. + * @throws IOException If failed. + */ + private TestOutputCommitter(TaskAttemptContext ctx, FileOutputCommitter delegate) throws IOException { + super(FileOutputFormat.getOutputPath(ctx), ctx); + + this.delegate = delegate; + } + + /** {@inheritDoc} */ + @Override public void setupJob(JobContext jobCtx) throws IOException { + try { + while (setupLockFile.exists()) + Thread.sleep(50); + } + catch (InterruptedException ignored) { + throw new IOException("Interrupted."); + } + + delegate.setupJob(jobCtx); + } + + /** {@inheritDoc} */ + @Override public void setupTask(TaskAttemptContext taskCtx) throws IOException { + delegate.setupTask(taskCtx); + } + + /** {@inheritDoc} */ + @Override public boolean needsTaskCommit(TaskAttemptContext taskCtx) throws IOException { + return delegate.needsTaskCommit(taskCtx); + } + + /** {@inheritDoc} */ + @Override public void commitTask(TaskAttemptContext taskCtx) throws IOException { + delegate.commitTask(taskCtx); + } + + /** {@inheritDoc} */ + @Override public void abortTask(TaskAttemptContext taskCtx) throws IOException { + delegate.abortTask(taskCtx); + } + } + + /** + * Test reducer. + */ + public static class TestReducer extends Reducer<Text, IntWritable, Text, IntWritable> { + /** Writable container for writing sum of word counts. */ + private IntWritable totalWordCnt = new IntWritable(); + + /** {@inheritDoc} */ + @Override public void reduce(Text key, Iterable<IntWritable> values, Context ctx) throws IOException, + InterruptedException { + while (reduceLockFile.exists()) + Thread.sleep(50); + + int wordCnt = 0; + + for (IntWritable value : values) + wordCnt += value.get(); + + totalWordCnt.set(wordCnt); + + ctx.write(key, totalWordCnt); + } + } +}