http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTasksAllVersionsTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTasksAllVersionsTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTasksAllVersionsTest.java index af3f872..a959472 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTasksAllVersionsTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTasksAllVersionsTest.java @@ -43,7 +43,7 @@ abstract class GridHadoopTasksAllVersionsTest extends GridHadoopAbstractWordCoun * @return Hadoop job. * @throws IOException If fails. */ - public abstract GridHadoopV2Job getHadoopJob(String inFile, String outFile) throws Exception; + public abstract HadoopV2Job getHadoopJob(String inFile, String outFile) throws Exception; /** * @return prefix of reducer output file name. It's "part-" for v1 and "part-r-" for v2 API @@ -79,11 +79,11 @@ abstract class GridHadoopTasksAllVersionsTest extends GridHadoopAbstractWordCoun GridHadoopFileBlock fileBlock2 = new GridHadoopFileBlock(HOSTS, inFileUri, fileBlock1.length(), igfs.info(inFile).length() - fileBlock1.length()); - GridHadoopV2Job gridJob = getHadoopJob(igfsScheme() + inFile.toString(), igfsScheme() + PATH_OUTPUT); + HadoopV2Job gridJob = getHadoopJob(igfsScheme() + inFile.toString(), igfsScheme() + PATH_OUTPUT); GridHadoopTaskInfo taskInfo = new GridHadoopTaskInfo(GridHadoopTaskType.MAP, gridJob.id(), 0, 0, fileBlock1); - GridHadoopTestTaskContext ctx = new GridHadoopTestTaskContext(taskInfo, gridJob); + HadoopTestTaskContext ctx = new HadoopTestTaskContext(taskInfo, gridJob); ctx.mockOutput().clear(); @@ -110,11 +110,11 @@ abstract class GridHadoopTasksAllVersionsTest extends GridHadoopAbstractWordCoun * @return Context with mock output. * @throws IgniteCheckedException If fails. */ - private GridHadoopTestTaskContext runTaskWithInput(GridHadoopV2Job gridJob, GridHadoopTaskType taskType, + private HadoopTestTaskContext runTaskWithInput(HadoopV2Job gridJob, GridHadoopTaskType taskType, int taskNum, String... words) throws IgniteCheckedException { GridHadoopTaskInfo taskInfo = new GridHadoopTaskInfo(taskType, gridJob.id(), taskNum, 0, null); - GridHadoopTestTaskContext ctx = new GridHadoopTestTaskContext(taskInfo, gridJob); + HadoopTestTaskContext ctx = new HadoopTestTaskContext(taskInfo, gridJob); for (int i = 0; i < words.length; i+=2) { List<IntWritable> valList = new ArrayList<>(); @@ -136,7 +136,7 @@ abstract class GridHadoopTasksAllVersionsTest extends GridHadoopAbstractWordCoun * @throws Exception If fails. */ public void testReduceTask() throws Exception { - GridHadoopV2Job gridJob = getHadoopJob(igfsScheme() + PATH_INPUT, igfsScheme() + PATH_OUTPUT); + HadoopV2Job gridJob = getHadoopJob(igfsScheme() + PATH_INPUT, igfsScheme() + PATH_OUTPUT); runTaskWithInput(gridJob, GridHadoopTaskType.REDUCE, 0, "word1", "5", "word2", "10"); runTaskWithInput(gridJob, GridHadoopTaskType.REDUCE, 1, "word3", "7", "word4", "15"); @@ -162,9 +162,9 @@ abstract class GridHadoopTasksAllVersionsTest extends GridHadoopAbstractWordCoun * @throws Exception If fails. */ public void testCombinerTask() throws Exception { - GridHadoopV2Job gridJob = getHadoopJob("/", "/"); + HadoopV2Job gridJob = getHadoopJob("/", "/"); - GridHadoopTestTaskContext ctx = + HadoopTestTaskContext ctx = runTaskWithInput(gridJob, GridHadoopTaskType.COMBINE, 0, "word1", "5", "word2", "10"); assertEquals("word1,5; word2,10", Joiner.on("; ").join(ctx.mockOutput())); @@ -182,18 +182,18 @@ abstract class GridHadoopTasksAllVersionsTest extends GridHadoopAbstractWordCoun * @return Context of combine task with mock output. * @throws IgniteCheckedException If fails. */ - private GridHadoopTestTaskContext runMapCombineTask(GridHadoopFileBlock fileBlock, GridHadoopV2Job gridJob) + private HadoopTestTaskContext runMapCombineTask(GridHadoopFileBlock fileBlock, HadoopV2Job gridJob) throws IgniteCheckedException { GridHadoopTaskInfo taskInfo = new GridHadoopTaskInfo(GridHadoopTaskType.MAP, gridJob.id(), 0, 0, fileBlock); - GridHadoopTestTaskContext mapCtx = new GridHadoopTestTaskContext(taskInfo, gridJob); + HadoopTestTaskContext mapCtx = new HadoopTestTaskContext(taskInfo, gridJob); mapCtx.run(); //Prepare input for combine taskInfo = new GridHadoopTaskInfo(GridHadoopTaskType.COMBINE, gridJob.id(), 0, 0, null); - GridHadoopTestTaskContext combineCtx = new GridHadoopTestTaskContext(taskInfo, gridJob); + HadoopTestTaskContext combineCtx = new HadoopTestTaskContext(taskInfo, gridJob); combineCtx.makeTreeOfWritables(mapCtx.mockOutput()); @@ -228,16 +228,16 @@ abstract class GridHadoopTasksAllVersionsTest extends GridHadoopAbstractWordCoun GridHadoopFileBlock fileBlock1 = new GridHadoopFileBlock(HOSTS, inFileUri, 0, l); GridHadoopFileBlock fileBlock2 = new GridHadoopFileBlock(HOSTS, inFileUri, l, fileLen - l); - GridHadoopV2Job gridJob = getHadoopJob(inFileUri.toString(), igfsScheme() + PATH_OUTPUT); + HadoopV2Job gridJob = getHadoopJob(inFileUri.toString(), igfsScheme() + PATH_OUTPUT); - GridHadoopTestTaskContext combine1Ctx = runMapCombineTask(fileBlock1, gridJob); + HadoopTestTaskContext combine1Ctx = runMapCombineTask(fileBlock1, gridJob); - GridHadoopTestTaskContext combine2Ctx = runMapCombineTask(fileBlock2, gridJob); + HadoopTestTaskContext combine2Ctx = runMapCombineTask(fileBlock2, gridJob); //Prepare input for combine GridHadoopTaskInfo taskInfo = new GridHadoopTaskInfo(GridHadoopTaskType.REDUCE, gridJob.id(), 0, 0, null); - GridHadoopTestTaskContext reduceCtx = new GridHadoopTestTaskContext(taskInfo, gridJob); + HadoopTestTaskContext reduceCtx = new HadoopTestTaskContext(taskInfo, gridJob); reduceCtx.makeTreeOfWritables(combine1Ctx.mockOutput()); reduceCtx.makeTreeOfWritables(combine2Ctx.mockOutput());
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTasksV1Test.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTasksV1Test.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTasksV1Test.java index c6b10bd..679be71 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTasksV1Test.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTasksV1Test.java @@ -38,7 +38,7 @@ public class GridHadoopTasksV1Test extends GridHadoopTasksAllVersionsTest { * @return Hadoop job. * @throws IOException If fails. */ - @Override public GridHadoopV2Job getHadoopJob(String inFile, String outFile) throws Exception { + @Override public HadoopV2Job getHadoopJob(String inFile, String outFile) throws Exception { JobConf jobConf = GridHadoopWordCount1.getJob(inFile, outFile); setupFileSystems(jobConf); @@ -47,7 +47,7 @@ public class GridHadoopTasksV1Test extends GridHadoopTasksAllVersionsTest { GridHadoopJobId jobId = new GridHadoopJobId(new UUID(0, 0), 0); - return new GridHadoopV2Job(jobId, jobInfo, log); + return new HadoopV2Job(jobId, jobInfo, log); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTasksV2Test.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTasksV2Test.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTasksV2Test.java index 13dd688..4d20b9c 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTasksV2Test.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTasksV2Test.java @@ -42,7 +42,7 @@ public class GridHadoopTasksV2Test extends GridHadoopTasksAllVersionsTest { * @return Hadoop job. * @throws Exception if fails. */ - @Override public GridHadoopV2Job getHadoopJob(String inFile, String outFile) throws Exception { + @Override public HadoopV2Job getHadoopJob(String inFile, String outFile) throws Exception { Job job = Job.getInstance(); job.setOutputKeyClass(Text.class); @@ -65,7 +65,7 @@ public class GridHadoopTasksV2Test extends GridHadoopTasksAllVersionsTest { GridHadoopJobId jobId = new GridHadoopJobId(new UUID(0, 0), 0); - return new GridHadoopV2Job(jobId, jobInfo, log); + return new HadoopV2Job(jobId, jobInfo, log); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTestTaskContext.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTestTaskContext.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTestTaskContext.java deleted file mode 100644 index 80b00a6..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopTestTaskContext.java +++ /dev/null @@ -1,219 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import org.apache.hadoop.io.*; -import org.apache.hadoop.mapred.*; -import org.apache.ignite.*; -import org.apache.ignite.internal.processors.hadoop.v2.*; - -import java.io.*; -import java.util.*; - -/** - * Context for test purpose. - */ -class GridHadoopTestTaskContext extends GridHadoopV2TaskContext { - /** - * Simple key-vale pair. - * @param <K> Key class. - * @param <V> Value class. - */ - public static class Pair<K,V> { - /** Key */ - private K key; - - /** Value */ - private V val; - - /** - * @param key key. - * @param val value. - */ - Pair(K key, V val) { - this.key = key; - this.val = val; - } - - /** - * Getter of key. - * @return key. - */ - K key() { - return key; - } - - /** - * Getter of value. - * @return value. - */ - V value() { - return val; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return key + "," + val; - } - } - - /** Mock output container- result data of task execution if it is not overridden. */ - private List<Pair<String, Integer>> mockOutput = new ArrayList<>(); - - /** Mock input container- input data if it is not overridden. */ - private Map<Object,List> mockInput = new TreeMap<>(); - - /** Context output implementation to write data into mockOutput. */ - private GridHadoopTaskOutput output = new GridHadoopTaskOutput() { - /** {@inheritDoc} */ - @Override public void write(Object key, Object val) { - //Check of casting and extract/copy values - String strKey = new String(((Text)key).getBytes()); - int intVal = ((IntWritable)val).get(); - - mockOutput().add(new Pair<>(strKey, intVal)); - } - - /** {@inheritDoc} */ - @Override public void close() { - throw new UnsupportedOperationException(); - } - }; - - /** Context input implementation to read data from mockInput. */ - private GridHadoopTaskInput input = new GridHadoopTaskInput() { - /** Iterator of keys and associated lists of values. */ - Iterator<Map.Entry<Object, List>> iter; - - /** Current key and associated value list. */ - Map.Entry<Object, List> currEntry; - - /** {@inheritDoc} */ - @Override public boolean next() { - if (iter == null) - iter = mockInput().entrySet().iterator(); - - if (iter.hasNext()) - currEntry = iter.next(); - else - currEntry = null; - - return currEntry != null; - } - - /** {@inheritDoc} */ - @Override public Object key() { - return currEntry.getKey(); - } - - /** {@inheritDoc} */ - @Override public Iterator<?> values() { - return currEntry.getValue().iterator() ; - } - - /** {@inheritDoc} */ - @Override public void close() { - throw new UnsupportedOperationException(); - } - }; - - /** - * Getter of mock output container - result of task if it is not overridden. - * - * @return mock output. - */ - public List<Pair<String, Integer>> mockOutput() { - return mockOutput; - } - - /** - * Getter of mock input container- input data if it is not overridden. - * - * @return mock output. - */ - public Map<Object, List> mockInput() { - return mockInput; - } - - /** - * Generate one-key-multiple-values tree from array of key-value pairs, and wrap its into Writable objects. - * The result is placed into mock input. - * - * @param flatData list of key-value pair. - */ - public void makeTreeOfWritables(Iterable<Pair<String, Integer>> flatData) { - Text key = new Text(); - - for (GridHadoopTestTaskContext.Pair<String, Integer> pair : flatData) { - key.set(pair.key); - ArrayList<IntWritable> valList; - - if (!mockInput.containsKey(key)) { - valList = new ArrayList<>(); - mockInput.put(key, valList); - key = new Text(); - } - else - valList = (ArrayList<IntWritable>) mockInput.get(key); - valList.add(new IntWritable(pair.value())); - } - } - - /** - * @param taskInfo Task info. - * @param gridJob Grid Hadoop job. - */ - public GridHadoopTestTaskContext(GridHadoopTaskInfo taskInfo, GridHadoopJob gridJob) throws IgniteCheckedException { - super(taskInfo, gridJob, gridJob.id(), null, jobConfDataInput(gridJob)); - } - - /** - * Creates DataInput to read JobConf. - * - * @param job Job. - * @return DataInput with JobConf. - * @throws IgniteCheckedException If failed. - */ - private static DataInput jobConfDataInput(GridHadoopJob job) throws IgniteCheckedException { - JobConf jobConf = new JobConf(); - - for (Map.Entry<String, String> e : ((HadoopDefaultJobInfo)job.info()).properties().entrySet()) - jobConf.set(e.getKey(), e.getValue()); - - ByteArrayOutputStream buf = new ByteArrayOutputStream(); - - try { - jobConf.write(new DataOutputStream(buf)); - } - catch (IOException e) { - throw new IgniteCheckedException(e); - } - - return new DataInputStream(new ByteArrayInputStream(buf.toByteArray())); - } - - /** {@inheritDoc} */ - @Override public GridHadoopTaskOutput output() { - return output; - } - - /** {@inheritDoc} */ - @Override public GridHadoopTaskInput input() { - return input; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopV2JobSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopV2JobSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopV2JobSelfTest.java deleted file mode 100644 index c7a456b..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopV2JobSelfTest.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import org.apache.hadoop.fs.*; -import org.apache.hadoop.io.*; -import org.apache.hadoop.io.serializer.*; -import org.apache.hadoop.mapred.*; -import org.apache.ignite.*; -import org.apache.ignite.internal.processors.hadoop.v2.*; - -import java.io.*; -import java.util.*; - -import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*; - -/** - * Self test of {@link GridHadoopV2Job}. - */ -public class GridHadoopV2JobSelfTest extends GridHadoopAbstractSelfTest { - /** */ - private static final String TEST_SERIALIZED_VALUE = "Test serialized value"; - - /** - * Custom serialization class that accepts {@link Writable}. - */ - private static class CustomSerialization extends WritableSerialization { - /** {@inheritDoc} */ - @Override public Deserializer<Writable> getDeserializer(Class<Writable> c) { - return new Deserializer<Writable>() { - @Override public void open(InputStream in) { } - - @Override public Writable deserialize(Writable writable) { - return new Text(TEST_SERIALIZED_VALUE); - } - - @Override public void close() { } - }; - } - } - - /** - * Tests that {@link GridHadoopJob} provides wrapped serializer if it's set in configuration. - * - * @throws IgniteCheckedException If fails. - */ - public void testCustomSerializationApplying() throws IgniteCheckedException { - JobConf cfg = new JobConf(); - - cfg.setMapOutputKeyClass(IntWritable.class); - cfg.setMapOutputValueClass(Text.class); - cfg.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, CustomSerialization.class.getName()); - - GridHadoopJob job = new GridHadoopV2Job(new GridHadoopJobId(UUID.randomUUID(), 1), createJobInfo(cfg), log); - - GridHadoopTaskContext taskCtx = job.getTaskContext(new GridHadoopTaskInfo(GridHadoopTaskType.MAP, null, 0, 0, - null)); - - GridHadoopSerialization ser = taskCtx.keySerialization(); - - assertEquals(GridHadoopSerializationWrapper.class.getName(), ser.getClass().getName()); - - DataInput in = new DataInputStream(new ByteArrayInputStream(new byte[0])); - - assertEquals(TEST_SERIALIZED_VALUE, ser.read(in, null).toString()); - - ser = taskCtx.valueSerialization(); - - assertEquals(GridHadoopSerializationWrapper.class.getName(), ser.getClass().getName()); - - assertEquals(TEST_SERIALIZED_VALUE, ser.read(in, null).toString()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSerializationWrapperSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSerializationWrapperSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSerializationWrapperSelfTest.java new file mode 100644 index 0000000..116248f --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSerializationWrapperSelfTest.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.hadoop.io.*; +import org.apache.hadoop.io.serializer.*; +import org.apache.ignite.internal.processors.hadoop.v2.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.io.*; +import java.util.*; + +/** + * Test of wrapper of the native serialization. + */ +public class HadoopSerializationWrapperSelfTest extends GridCommonAbstractTest { + /** + * Tests read/write of IntWritable via native WritableSerialization. + * @throws Exception If fails. + */ + public void testIntWritableSerialization() throws Exception { + GridHadoopSerialization ser = new HadoopSerializationWrapper(new WritableSerialization(), IntWritable.class); + + ByteArrayOutputStream buf = new ByteArrayOutputStream(); + + DataOutput out = new DataOutputStream(buf); + + ser.write(out, new IntWritable(3)); + ser.write(out, new IntWritable(-5)); + + assertEquals("[0, 0, 0, 3, -1, -1, -1, -5]", Arrays.toString(buf.toByteArray())); + + DataInput in = new DataInputStream(new ByteArrayInputStream(buf.toByteArray())); + + assertEquals(3, ((IntWritable)ser.read(in, null)).get()); + assertEquals(-5, ((IntWritable)ser.read(in, null)).get()); + } + + /** + * Tests read/write of Integer via native JavaleSerialization. + * @throws Exception If fails. + */ + public void testIntJavaSerialization() throws Exception { + GridHadoopSerialization ser = new HadoopSerializationWrapper(new JavaSerialization(), Integer.class); + + ByteArrayOutputStream buf = new ByteArrayOutputStream(); + + DataOutput out = new DataOutputStream(buf); + + ser.write(out, 3); + ser.write(out, -5); + ser.close(); + + DataInput in = new DataInputStream(new ByteArrayInputStream(buf.toByteArray())); + + assertEquals(3, ((Integer)ser.read(in, null)).intValue()); + assertEquals(-5, ((Integer)ser.read(in, null)).intValue()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSplitWrapperSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSplitWrapperSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSplitWrapperSelfTest.java new file mode 100644 index 0000000..040730b --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSplitWrapperSelfTest.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.hadoop.fs.*; +import org.apache.hadoop.mapreduce.lib.input.*; +import org.apache.ignite.internal.processors.hadoop.v2.*; +import org.apache.ignite.testframework.*; + +import java.io.*; +import java.util.*; +import java.util.concurrent.*; + +/** + * Self test of {@link org.apache.ignite.internal.processors.hadoop.v2.HadoopSplitWrapper}. + */ +public class HadoopSplitWrapperSelfTest extends GridHadoopAbstractSelfTest { + /** + * Tests serialization of wrapper and the wrapped native split. + * @throws Exception If fails. + */ + public void testSerialization() throws Exception { + FileSplit nativeSplit = new FileSplit(new Path("/path/to/file"), 100, 500, new String[]{"host1", "host2"}); + + assertEquals("/path/to/file:100+500", nativeSplit.toString()); + + HadoopSplitWrapper split = HadoopUtils.wrapSplit(10, nativeSplit, nativeSplit.getLocations()); + + assertEquals("[host1, host2]", Arrays.toString(split.hosts())); + + ByteArrayOutputStream buf = new ByteArrayOutputStream(); + + ObjectOutput out = new ObjectOutputStream(buf); + + out.writeObject(split); + + ObjectInput in = new ObjectInputStream(new ByteArrayInputStream(buf.toByteArray())); + + final HadoopSplitWrapper res = (HadoopSplitWrapper)in.readObject(); + + assertEquals("/path/to/file:100+500", HadoopUtils.unwrapSplit(res).toString()); + + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + res.hosts(); + + return null; + } + }, AssertionError.class, null); + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTestTaskContext.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTestTaskContext.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTestTaskContext.java new file mode 100644 index 0000000..9b56300 --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTestTaskContext.java @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.hadoop.io.*; +import org.apache.hadoop.mapred.*; +import org.apache.ignite.*; +import org.apache.ignite.internal.processors.hadoop.v2.*; + +import java.io.*; +import java.util.*; + +/** + * Context for test purpose. + */ +class HadoopTestTaskContext extends HadoopV2TaskContext { + /** + * Simple key-vale pair. + * @param <K> Key class. + * @param <V> Value class. + */ + public static class Pair<K,V> { + /** Key */ + private K key; + + /** Value */ + private V val; + + /** + * @param key key. + * @param val value. + */ + Pair(K key, V val) { + this.key = key; + this.val = val; + } + + /** + * Getter of key. + * @return key. + */ + K key() { + return key; + } + + /** + * Getter of value. + * @return value. + */ + V value() { + return val; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return key + "," + val; + } + } + + /** Mock output container- result data of task execution if it is not overridden. */ + private List<Pair<String, Integer>> mockOutput = new ArrayList<>(); + + /** Mock input container- input data if it is not overridden. */ + private Map<Object,List> mockInput = new TreeMap<>(); + + /** Context output implementation to write data into mockOutput. */ + private GridHadoopTaskOutput output = new GridHadoopTaskOutput() { + /** {@inheritDoc} */ + @Override public void write(Object key, Object val) { + //Check of casting and extract/copy values + String strKey = new String(((Text)key).getBytes()); + int intVal = ((IntWritable)val).get(); + + mockOutput().add(new Pair<>(strKey, intVal)); + } + + /** {@inheritDoc} */ + @Override public void close() { + throw new UnsupportedOperationException(); + } + }; + + /** Context input implementation to read data from mockInput. */ + private GridHadoopTaskInput input = new GridHadoopTaskInput() { + /** Iterator of keys and associated lists of values. */ + Iterator<Map.Entry<Object, List>> iter; + + /** Current key and associated value list. */ + Map.Entry<Object, List> currEntry; + + /** {@inheritDoc} */ + @Override public boolean next() { + if (iter == null) + iter = mockInput().entrySet().iterator(); + + if (iter.hasNext()) + currEntry = iter.next(); + else + currEntry = null; + + return currEntry != null; + } + + /** {@inheritDoc} */ + @Override public Object key() { + return currEntry.getKey(); + } + + /** {@inheritDoc} */ + @Override public Iterator<?> values() { + return currEntry.getValue().iterator() ; + } + + /** {@inheritDoc} */ + @Override public void close() { + throw new UnsupportedOperationException(); + } + }; + + /** + * Getter of mock output container - result of task if it is not overridden. + * + * @return mock output. + */ + public List<Pair<String, Integer>> mockOutput() { + return mockOutput; + } + + /** + * Getter of mock input container- input data if it is not overridden. + * + * @return mock output. + */ + public Map<Object, List> mockInput() { + return mockInput; + } + + /** + * Generate one-key-multiple-values tree from array of key-value pairs, and wrap its into Writable objects. + * The result is placed into mock input. + * + * @param flatData list of key-value pair. + */ + public void makeTreeOfWritables(Iterable<Pair<String, Integer>> flatData) { + Text key = new Text(); + + for (HadoopTestTaskContext.Pair<String, Integer> pair : flatData) { + key.set(pair.key); + ArrayList<IntWritable> valList; + + if (!mockInput.containsKey(key)) { + valList = new ArrayList<>(); + mockInput.put(key, valList); + key = new Text(); + } + else + valList = (ArrayList<IntWritable>) mockInput.get(key); + valList.add(new IntWritable(pair.value())); + } + } + + /** + * @param taskInfo Task info. + * @param gridJob Grid Hadoop job. + */ + public HadoopTestTaskContext(GridHadoopTaskInfo taskInfo, GridHadoopJob gridJob) throws IgniteCheckedException { + super(taskInfo, gridJob, gridJob.id(), null, jobConfDataInput(gridJob)); + } + + /** + * Creates DataInput to read JobConf. + * + * @param job Job. + * @return DataInput with JobConf. + * @throws IgniteCheckedException If failed. + */ + private static DataInput jobConfDataInput(GridHadoopJob job) throws IgniteCheckedException { + JobConf jobConf = new JobConf(); + + for (Map.Entry<String, String> e : ((HadoopDefaultJobInfo)job.info()).properties().entrySet()) + jobConf.set(e.getKey(), e.getValue()); + + ByteArrayOutputStream buf = new ByteArrayOutputStream(); + + try { + jobConf.write(new DataOutputStream(buf)); + } + catch (IOException e) { + throw new IgniteCheckedException(e); + } + + return new DataInputStream(new ByteArrayInputStream(buf.toByteArray())); + } + + /** {@inheritDoc} */ + @Override public GridHadoopTaskOutput output() { + return output; + } + + /** {@inheritDoc} */ + @Override public GridHadoopTaskInput input() { + return input; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java new file mode 100644 index 0000000..66e35b5 --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.hadoop.fs.*; +import org.apache.hadoop.io.*; +import org.apache.hadoop.io.serializer.*; +import org.apache.hadoop.mapred.*; +import org.apache.ignite.*; +import org.apache.ignite.internal.processors.hadoop.v2.*; + +import java.io.*; +import java.util.*; + +import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*; + +/** + * Self test of {@link org.apache.ignite.internal.processors.hadoop.v2.HadoopV2Job}. + */ +public class HadoopV2JobSelfTest extends GridHadoopAbstractSelfTest { + /** */ + private static final String TEST_SERIALIZED_VALUE = "Test serialized value"; + + /** + * Custom serialization class that accepts {@link Writable}. + */ + private static class CustomSerialization extends WritableSerialization { + /** {@inheritDoc} */ + @Override public Deserializer<Writable> getDeserializer(Class<Writable> c) { + return new Deserializer<Writable>() { + @Override public void open(InputStream in) { } + + @Override public Writable deserialize(Writable writable) { + return new Text(TEST_SERIALIZED_VALUE); + } + + @Override public void close() { } + }; + } + } + + /** + * Tests that {@link GridHadoopJob} provides wrapped serializer if it's set in configuration. + * + * @throws IgniteCheckedException If fails. + */ + public void testCustomSerializationApplying() throws IgniteCheckedException { + JobConf cfg = new JobConf(); + + cfg.setMapOutputKeyClass(IntWritable.class); + cfg.setMapOutputValueClass(Text.class); + cfg.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, CustomSerialization.class.getName()); + + GridHadoopJob job = new HadoopV2Job(new GridHadoopJobId(UUID.randomUUID(), 1), createJobInfo(cfg), log); + + GridHadoopTaskContext taskCtx = job.getTaskContext(new GridHadoopTaskInfo(GridHadoopTaskType.MAP, null, 0, 0, + null)); + + GridHadoopSerialization ser = taskCtx.keySerialization(); + + assertEquals(HadoopSerializationWrapper.class.getName(), ser.getClass().getName()); + + DataInput in = new DataInputStream(new ByteArrayInputStream(new byte[0])); + + assertEquals(TEST_SERIALIZED_VALUE, ser.read(in, null).toString()); + + ser = taskCtx.valueSerialization(); + + assertEquals(HadoopSerializationWrapper.class.getName(), ser.getClass().getName()); + + assertEquals(TEST_SERIALIZED_VALUE, ser.read(in, null).toString()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopAbstractMapTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopAbstractMapTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopAbstractMapTest.java index 716fe19..aa0ddc1 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopAbstractMapTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopAbstractMapTest.java @@ -60,12 +60,12 @@ public abstract class GridHadoopAbstractMapTest extends GridCommonAbstractTest { /** {@inheritDoc} */ @Override public GridHadoopSerialization keySerialization() throws IgniteCheckedException { - return new GridHadoopWritableSerialization(IntWritable.class); + return new HadoopWritableSerialization(IntWritable.class); } /** {@inheritDoc} */ @Override public GridHadoopSerialization valueSerialization() throws IgniteCheckedException { - return new GridHadoopWritableSerialization(IntWritable.class); + return new HadoopWritableSerialization(IntWritable.class); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/GridHadoopExternalCommunicationSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/GridHadoopExternalCommunicationSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/GridHadoopExternalCommunicationSelfTest.java deleted file mode 100644 index dd3c5d4..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/GridHadoopExternalCommunicationSelfTest.java +++ /dev/null @@ -1,209 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.taskexecutor.external.communication; - -import org.apache.ignite.*; -import org.apache.ignite.internal.processors.hadoop.message.*; -import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.marshaller.*; -import org.apache.ignite.marshaller.optimized.*; -import org.apache.ignite.testframework.junits.common.*; - -import java.io.*; -import java.util.*; -import java.util.concurrent.*; - -/** - * Tests Hadoop external communication component. - */ -public class GridHadoopExternalCommunicationSelfTest extends GridCommonAbstractTest { - /** - * @throws Exception If failed. - */ - public void testSimpleMessageSendingTcp() throws Exception { - checkSimpleMessageSending(false); - } - - /** - * @throws Exception If failed. - */ - public void testSimpleMessageSendingShmem() throws Exception { - checkSimpleMessageSending(true); - } - - /** - * @throws Exception If failed. - */ - private void checkSimpleMessageSending(boolean useShmem) throws Exception { - UUID parentNodeId = UUID.randomUUID(); - - Marshaller marsh = new OptimizedMarshaller(); - - IgniteLogger log = log(); - - GridHadoopExternalCommunication[] comms = new GridHadoopExternalCommunication[4]; - - try { - String name = "grid"; - - TestHadoopListener[] lsnrs = new TestHadoopListener[4]; - - int msgs = 10; - - for (int i = 0; i < comms.length; i++) { - comms[i] = new GridHadoopExternalCommunication(parentNodeId, UUID.randomUUID(), marsh, log, - Executors.newFixedThreadPool(1), name + i); - - if (useShmem) - comms[i].setSharedMemoryPort(14000); - - lsnrs[i] = new TestHadoopListener(msgs); - - comms[i].setListener(lsnrs[i]); - - comms[i].start(); - } - - for (int r = 0; r < msgs; r++) { - for (int from = 0; from < comms.length; from++) { - for (int to = 0; to < comms.length; to++) { - if (from == to) - continue; - - comms[from].sendMessage(comms[to].localProcessDescriptor(), new TestMessage(from, to)); - } - } - } - - U.sleep(1000); - - for (TestHadoopListener lsnr : lsnrs) { - lsnr.await(3_000); - - assertEquals(String.valueOf(lsnr.messages()), msgs * (comms.length - 1), lsnr.messages().size()); - } - } - finally { - for (GridHadoopExternalCommunication comm : comms) { - if (comm != null) - comm.stop(); - } - } - } - - /** - * - */ - private static class TestHadoopListener implements GridHadoopMessageListener { - /** Received messages (array list is safe because executor has one thread). */ - private Collection<TestMessage> msgs = new ArrayList<>(); - - /** Await latch. */ - private CountDownLatch receiveLatch; - - /** - * @param msgs Number of messages to await. - */ - private TestHadoopListener(int msgs) { - receiveLatch = new CountDownLatch(msgs); - } - - /** {@inheritDoc} */ - @Override public void onMessageReceived(HadoopProcessDescriptor desc, HadoopMessage msg) { - assert msg instanceof TestMessage; - - msgs.add((TestMessage)msg); - - receiveLatch.countDown(); - } - - /** {@inheritDoc} */ - @Override public void onConnectionLost(HadoopProcessDescriptor desc) { - // No-op. - } - - /** - * @return Received messages. - */ - public Collection<TestMessage> messages() { - return msgs; - } - - /** - * @param millis Time to await. - * @throws InterruptedException If wait interrupted. - */ - public void await(int millis) throws InterruptedException { - receiveLatch.await(millis, TimeUnit.MILLISECONDS); - } - } - - /** - * - */ - private static class TestMessage implements HadoopMessage { - /** From index. */ - private int from; - - /** To index. */ - private int to; - - /** - * @param from From index. - * @param to To index. - */ - private TestMessage(int from, int to) { - this.from = from; - this.to = to; - } - - /** - * Required by {@link Externalizable}. - */ - public TestMessage() { - // No-op. - } - - /** - * @return From index. - */ - public int from() { - return from; - } - - /** - * @return To index. - */ - public int to() { - return to; - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeInt(from); - out.writeInt(to); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - from = in.readInt(); - to = in.readInt(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunicationSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunicationSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunicationSelfTest.java new file mode 100644 index 0000000..a21633d --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunicationSelfTest.java @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.taskexecutor.external.communication; + +import org.apache.ignite.*; +import org.apache.ignite.internal.processors.hadoop.message.*; +import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.marshaller.*; +import org.apache.ignite.marshaller.optimized.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.io.*; +import java.util.*; +import java.util.concurrent.*; + +/** + * Tests Hadoop external communication component. + */ +public class HadoopExternalCommunicationSelfTest extends GridCommonAbstractTest { + /** + * @throws Exception If failed. + */ + public void testSimpleMessageSendingTcp() throws Exception { + checkSimpleMessageSending(false); + } + + /** + * @throws Exception If failed. + */ + public void testSimpleMessageSendingShmem() throws Exception { + checkSimpleMessageSending(true); + } + + /** + * @throws Exception If failed. + */ + private void checkSimpleMessageSending(boolean useShmem) throws Exception { + UUID parentNodeId = UUID.randomUUID(); + + Marshaller marsh = new OptimizedMarshaller(); + + IgniteLogger log = log(); + + HadoopExternalCommunication[] comms = new HadoopExternalCommunication[4]; + + try { + String name = "grid"; + + TestHadoopListener[] lsnrs = new TestHadoopListener[4]; + + int msgs = 10; + + for (int i = 0; i < comms.length; i++) { + comms[i] = new HadoopExternalCommunication(parentNodeId, UUID.randomUUID(), marsh, log, + Executors.newFixedThreadPool(1), name + i); + + if (useShmem) + comms[i].setSharedMemoryPort(14000); + + lsnrs[i] = new TestHadoopListener(msgs); + + comms[i].setListener(lsnrs[i]); + + comms[i].start(); + } + + for (int r = 0; r < msgs; r++) { + for (int from = 0; from < comms.length; from++) { + for (int to = 0; to < comms.length; to++) { + if (from == to) + continue; + + comms[from].sendMessage(comms[to].localProcessDescriptor(), new TestMessage(from, to)); + } + } + } + + U.sleep(1000); + + for (TestHadoopListener lsnr : lsnrs) { + lsnr.await(3_000); + + assertEquals(String.valueOf(lsnr.messages()), msgs * (comms.length - 1), lsnr.messages().size()); + } + } + finally { + for (HadoopExternalCommunication comm : comms) { + if (comm != null) + comm.stop(); + } + } + } + + /** + * + */ + private static class TestHadoopListener implements HadoopMessageListener { + /** Received messages (array list is safe because executor has one thread). */ + private Collection<TestMessage> msgs = new ArrayList<>(); + + /** Await latch. */ + private CountDownLatch receiveLatch; + + /** + * @param msgs Number of messages to await. + */ + private TestHadoopListener(int msgs) { + receiveLatch = new CountDownLatch(msgs); + } + + /** {@inheritDoc} */ + @Override public void onMessageReceived(HadoopProcessDescriptor desc, HadoopMessage msg) { + assert msg instanceof TestMessage; + + msgs.add((TestMessage)msg); + + receiveLatch.countDown(); + } + + /** {@inheritDoc} */ + @Override public void onConnectionLost(HadoopProcessDescriptor desc) { + // No-op. + } + + /** + * @return Received messages. + */ + public Collection<TestMessage> messages() { + return msgs; + } + + /** + * @param millis Time to await. + * @throws InterruptedException If wait interrupted. + */ + public void await(int millis) throws InterruptedException { + receiveLatch.await(millis, TimeUnit.MILLISECONDS); + } + } + + /** + * + */ + private static class TestMessage implements HadoopMessage { + /** From index. */ + private int from; + + /** To index. */ + private int to; + + /** + * @param from From index. + * @param to To index. + */ + private TestMessage(int from, int to) { + this.from = from; + this.to = to; + } + + /** + * Required by {@link Externalizable}. + */ + public TestMessage() { + // No-op. + } + + /** + * @return From index. + */ + public int from() { + return from; + } + + /** + * @return To index. + */ + public int to() { + return to; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeInt(from); + out.writeInt(to); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + from = in.readInt(); + to = in.readInt(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java index 822ab8f..1413c7e 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java @@ -94,10 +94,10 @@ public class IgniteHadoopTestSuite extends TestSuite { suite.addTest(new TestSuite(ldr.loadClass(GridHadoopTaskExecutionSelfTest.class.getName()))); - suite.addTest(new TestSuite(ldr.loadClass(GridHadoopV2JobSelfTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(HadoopV2JobSelfTest.class.getName()))); - suite.addTest(new TestSuite(ldr.loadClass(GridHadoopSerializationWrapperSelfTest.class.getName()))); - suite.addTest(new TestSuite(ldr.loadClass(GridHadoopSplitWrapperSelfTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(HadoopSerializationWrapperSelfTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(HadoopSplitWrapperSelfTest.class.getName()))); suite.addTest(new TestSuite(ldr.loadClass(GridHadoopTasksV1Test.class.getName()))); suite.addTest(new TestSuite(ldr.loadClass(GridHadoopTasksV2Test.class.getName()))); @@ -107,7 +107,7 @@ public class IgniteHadoopTestSuite extends TestSuite { suite.addTest(new TestSuite(ldr.loadClass(GridHadoopMapReduceEmbeddedSelfTest.class.getName()))); suite.addTest(new TestSuite(ldr.loadClass(GridHadoopExternalTaskExecutionSelfTest.class.getName()))); - suite.addTest(new TestSuite(ldr.loadClass(GridHadoopExternalCommunicationSelfTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(HadoopExternalCommunicationSelfTest.class.getName()))); suite.addTest(new TestSuite(ldr.loadClass(GridHadoopSortingTest.class.getName())));