http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSplitWrapperSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSplitWrapperSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSplitWrapperSelfTest.java index 040730b..ee490be 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSplitWrapperSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSplitWrapperSelfTest.java @@ -29,7 +29,7 @@ import java.util.concurrent.*; /** * Self test of {@link org.apache.ignite.internal.processors.hadoop.v2.HadoopSplitWrapper}. */ -public class HadoopSplitWrapperSelfTest extends GridHadoopAbstractSelfTest { +public class HadoopSplitWrapperSelfTest extends HadoopAbstractSelfTest { /** * Tests serialization of wrapper and the wrapped native split. * @throws Exception If fails.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskExecutionSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskExecutionSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskExecutionSelfTest.java new file mode 100644 index 0000000..a489f28 --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskExecutionSelfTest.java @@ -0,0 +1,551 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.hadoop.conf.*; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.*; +import org.apache.hadoop.io.*; +import org.apache.hadoop.mapreduce.*; +import org.apache.hadoop.mapreduce.lib.input.*; +import org.apache.hadoop.mapreduce.lib.output.*; +import org.apache.ignite.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.hadoop.fs.v1.*; +import org.apache.ignite.igfs.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.util.lang.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.testframework.*; + +import java.io.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*; + +/** + * Tests map-reduce task execution basics. + */ +public class HadoopTaskExecutionSelfTest extends HadoopAbstractSelfTest { + /** */ + private static GridHadoopSharedMap m = GridHadoopSharedMap.map(HadoopTaskExecutionSelfTest.class); + + /** Line count. */ + private static final AtomicInteger totalLineCnt = m.put("totalLineCnt", new AtomicInteger()); + + /** Executed tasks. */ + private static final AtomicInteger executedTasks = m.put("executedTasks", new AtomicInteger()); + + /** Cancelled tasks. */ + private static final AtomicInteger cancelledTasks = m.put("cancelledTasks", new AtomicInteger()); + + /** Working directory of each task. */ + private static final Map<String, String> taskWorkDirs = m.put("taskWorkDirs", + new ConcurrentHashMap<String, String>()); + + /** Mapper id to fail. */ + private static final AtomicInteger failMapperId = m.put("failMapperId", new AtomicInteger()); + + /** Number of splits of the current input. */ + private static final AtomicInteger splitsCount = m.put("splitsCount", new AtomicInteger()); + + /** Test param. */ + private static final String MAP_WRITE = "test.map.write"; + + + /** {@inheritDoc} */ + @Override public IgfsConfiguration igfsConfiguration() { + IgfsConfiguration cfg = super.igfsConfiguration(); + + cfg.setFragmentizerEnabled(false); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected boolean igfsEnabled() { + return true; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + startGrids(gridCount()); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + + super.afterTestsStopped(); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + grid(0).fileSystem(igfsName).format(); + } + + /** {@inheritDoc} */ + @Override public GridHadoopConfiguration hadoopConfiguration(String gridName) { + GridHadoopConfiguration cfg = super.hadoopConfiguration(gridName); + + cfg.setMaxParallelTasks(5); + cfg.setExternalExecution(false); + + return cfg; + } + + /** + * @throws Exception If failed. + */ + public void testMapRun() throws Exception { + int lineCnt = 10000; + String fileName = "/testFile"; + + prepareFile(fileName, lineCnt); + + totalLineCnt.set(0); + taskWorkDirs.clear(); + + Configuration cfg = new Configuration(); + + cfg.setStrings("fs.igfs.impl", IgniteHadoopFileSystem.class.getName()); + + Job job = Job.getInstance(cfg); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(IntWritable.class); + + job.setMapperClass(TestMapper.class); + + job.setNumReduceTasks(0); + + job.setInputFormatClass(TextInputFormat.class); + + FileInputFormat.setInputPaths(job, new Path("igfs://:" + getTestGridName(0) + "@/")); + FileOutputFormat.setOutputPath(job, new Path("igfs://:" + getTestGridName(0) + "@/output/")); + + job.setJarByClass(getClass()); + + IgniteInternalFuture<?> fut = grid(0).hadoop().submit(new GridHadoopJobId(UUID.randomUUID(), 1), + createJobInfo(job.getConfiguration())); + + fut.get(); + + assertEquals(lineCnt, totalLineCnt.get()); + + assertEquals(32, taskWorkDirs.size()); + } + + /** + * @throws Exception If failed. + */ + public void testMapCombineRun() throws Exception { + int lineCnt = 10001; + String fileName = "/testFile"; + + prepareFile(fileName, lineCnt); + + totalLineCnt.set(0); + taskWorkDirs.clear(); + + Configuration cfg = new Configuration(); + + cfg.setStrings("fs.igfs.impl", IgniteHadoopFileSystem.class.getName()); + cfg.setBoolean(MAP_WRITE, true); + + Job job = Job.getInstance(cfg); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(IntWritable.class); + + job.setMapperClass(TestMapper.class); + job.setCombinerClass(TestCombiner.class); + job.setReducerClass(TestReducer.class); + + job.setNumReduceTasks(2); + + job.setInputFormatClass(TextInputFormat.class); + + FileInputFormat.setInputPaths(job, new Path("igfs://:" + getTestGridName(0) + "@/")); + FileOutputFormat.setOutputPath(job, new Path("igfs://:" + getTestGridName(0) + "@/output")); + + job.setJarByClass(getClass()); + + GridHadoopJobId jobId = new GridHadoopJobId(UUID.randomUUID(), 2); + + IgniteInternalFuture<?> fut = grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration())); + + fut.get(); + + assertEquals(lineCnt, totalLineCnt.get()); + + assertEquals(34, taskWorkDirs.size()); + + for (int g = 0; g < gridCount(); g++) + grid(g).hadoop().finishFuture(jobId).get(); + } + + /** + * @throws Exception If failed. + */ + public void testMapperException() throws Exception { + prepareFile("/testFile", 1000); + + Configuration cfg = new Configuration(); + + cfg.setStrings("fs.igfs.impl", IgniteHadoopFileSystem.class.getName()); + + Job job = Job.getInstance(cfg); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(IntWritable.class); + + job.setMapperClass(FailMapper.class); + + job.setNumReduceTasks(0); + + job.setInputFormatClass(TextInputFormat.class); + + FileInputFormat.setInputPaths(job, new Path("igfs://:" + getTestGridName(0) + "@/")); + FileOutputFormat.setOutputPath(job, new Path("igfs://:" + getTestGridName(0) + "@/output/")); + + job.setJarByClass(getClass()); + + final IgniteInternalFuture<?> fut = grid(0).hadoop().submit(new GridHadoopJobId(UUID.randomUUID(), 3), + createJobInfo(job.getConfiguration())); + + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + fut.get(); + + return null; + } + }, IgniteCheckedException.class, null); + } + + /** + * @param fileName File name. + * @param lineCnt Line count. + * @throws Exception If failed. + */ + private void prepareFile(String fileName, int lineCnt) throws Exception { + IgniteFs igfs = grid(0).fileSystem(igfsName); + + try (OutputStream os = igfs.create(new IgfsPath(fileName), true)) { + PrintWriter w = new PrintWriter(new OutputStreamWriter(os)); + + for (int i = 0; i < lineCnt; i++) + w.print("Hello, Hadoop map-reduce!\n"); + + w.flush(); + } + } + + /** + * Prepare job with mappers to cancel. + * @return Fully configured job. + * @throws Exception If fails. + */ + private Configuration prepareJobForCancelling() throws Exception { + prepareFile("/testFile", 1500); + + executedTasks.set(0); + cancelledTasks.set(0); + failMapperId.set(0); + splitsCount.set(0); + + Configuration cfg = new Configuration(); + + setupFileSystems(cfg); + + Job job = Job.getInstance(cfg); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(IntWritable.class); + + job.setMapperClass(CancellingTestMapper.class); + + job.setNumReduceTasks(0); + + job.setInputFormatClass(InFormat.class); + + FileInputFormat.setInputPaths(job, new Path("igfs://:" + getTestGridName(0) + "@/")); + FileOutputFormat.setOutputPath(job, new Path("igfs://:" + getTestGridName(0) + "@/output/")); + + job.setJarByClass(getClass()); + + return job.getConfiguration(); + } + + /** + * Test input format. + */ + private static class InFormat extends TextInputFormat { + @Override public List<InputSplit> getSplits(JobContext ctx) throws IOException { + List<InputSplit> res = super.getSplits(ctx); + + splitsCount.set(res.size()); + + X.println("___ split of input: " + splitsCount.get()); + + return res; + } + } + + /** + * @throws Exception If failed. + */ + public void testTaskCancelling() throws Exception { + Configuration cfg = prepareJobForCancelling(); + + GridHadoopJobId jobId = new GridHadoopJobId(UUID.randomUUID(), 1); + + final IgniteInternalFuture<?> fut = grid(0).hadoop().submit(jobId, createJobInfo(cfg)); + + if (!GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return splitsCount.get() > 0; + } + }, 20000)) { + U.dumpThreads(log); + + assertTrue(false); + } + + if (!GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return executedTasks.get() == splitsCount.get(); + } + }, 20000)) { + U.dumpThreads(log); + + assertTrue(false); + } + + // Fail mapper with id "1", cancels others + failMapperId.set(1); + + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + fut.get(); + + return null; + } + }, IgniteCheckedException.class, null); + + assertEquals(executedTasks.get(), cancelledTasks.get() + 1); + } + + /** + * @throws Exception If failed. + */ + public void testJobKill() throws Exception { + Configuration cfg = prepareJobForCancelling(); + + Hadoop hadoop = grid(0).hadoop(); + + GridHadoopJobId jobId = new GridHadoopJobId(UUID.randomUUID(), 1); + + //Kill unknown job. + boolean killRes = hadoop.kill(jobId); + + assertFalse(killRes); + + final IgniteInternalFuture<?> fut = hadoop.submit(jobId, createJobInfo(cfg)); + + if (!GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return splitsCount.get() > 0; + } + }, 20000)) { + U.dumpThreads(log); + + assertTrue(false); + } + + if (!GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + X.println("___ executed tasks: " + executedTasks.get()); + + return executedTasks.get() == splitsCount.get(); + } + }, 20000)) { + U.dumpThreads(log); + + fail(); + } + + //Kill really ran job. + killRes = hadoop.kill(jobId); + + assertTrue(killRes); + + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + fut.get(); + + return null; + } + }, IgniteCheckedException.class, null); + + assertEquals(executedTasks.get(), cancelledTasks.get()); + + //Kill the same job again. + killRes = hadoop.kill(jobId); + + assertTrue(killRes); + } + + private static class CancellingTestMapper extends Mapper<Object, Text, Text, IntWritable> { + private int mapperId; + + /** {@inheritDoc} */ + @Override protected void setup(Context ctx) throws IOException, InterruptedException { + mapperId = executedTasks.incrementAndGet(); + } + + /** {@inheritDoc} */ + @Override public void run(Context ctx) throws IOException, InterruptedException { + try { + super.run(ctx); + } + catch (HadoopTaskCancelledException e) { + cancelledTasks.incrementAndGet(); + + throw e; + } + } + + /** {@inheritDoc} */ + @Override protected void map(Object key, Text val, Context ctx) throws IOException, InterruptedException { + if (mapperId == failMapperId.get()) + throw new IOException(); + + Thread.sleep(1000); + } + } + + /** + * Test failing mapper. + */ + private static class FailMapper extends Mapper<Object, Text, Text, IntWritable> { + /** {@inheritDoc} */ + @Override protected void map(Object key, Text val, Context ctx) throws IOException, InterruptedException { + throw new IOException("Expected"); + } + } + + /** + * Mapper calculates number of lines. + */ + private static class TestMapper extends Mapper<Object, Text, Text, IntWritable> { + /** Writable integer constant of '1'. */ + private static final IntWritable ONE = new IntWritable(1); + + /** Line count constant. */ + public static final Text LINE_COUNT = new Text("lineCount"); + + /** {@inheritDoc} */ + @Override protected void setup(Context ctx) throws IOException, InterruptedException { + X.println("___ Mapper: " + ctx.getTaskAttemptID()); + + String taskId = ctx.getTaskAttemptID().toString(); + + LocalFileSystem locFs = FileSystem.getLocal(ctx.getConfiguration()); + + String workDir = locFs.getWorkingDirectory().toString(); + + assertNull(taskWorkDirs.put(workDir, taskId)); + } + + /** {@inheritDoc} */ + @Override protected void map(Object key, Text val, Context ctx) throws IOException, InterruptedException { + if (ctx.getConfiguration().getBoolean(MAP_WRITE, false)) + ctx.write(LINE_COUNT, ONE); + else + totalLineCnt.incrementAndGet(); + } + } + + /** + * Combiner calculates number of lines. + */ + private static class TestCombiner extends Reducer<Text, IntWritable, Text, IntWritable> { + /** */ + IntWritable sum = new IntWritable(); + + /** {@inheritDoc} */ + @Override protected void setup(Context ctx) throws IOException, InterruptedException { + X.println("___ Combiner: "); + } + + /** {@inheritDoc} */ + @Override protected void reduce(Text key, Iterable<IntWritable> values, Context ctx) throws IOException, + InterruptedException { + int lineCnt = 0; + + for (IntWritable value : values) + lineCnt += value.get(); + + sum.set(lineCnt); + + X.println("___ combo: " + lineCnt); + + ctx.write(key, sum); + } + } + + /** + * Combiner calculates number of lines. + */ + private static class TestReducer extends Reducer<Text, IntWritable, Text, IntWritable> { + /** */ + IntWritable sum = new IntWritable(); + + /** {@inheritDoc} */ + @Override protected void setup(Context ctx) throws IOException, InterruptedException { + X.println("___ Reducer: " + ctx.getTaskAttemptID()); + + String taskId = ctx.getTaskAttemptID().toString(); + String workDir = FileSystem.getLocal(ctx.getConfiguration()).getWorkingDirectory().toString(); + + assertNull(taskWorkDirs.put(workDir, taskId)); + } + + /** {@inheritDoc} */ + @Override protected void reduce(Text key, Iterable<IntWritable> values, Context ctx) throws IOException, + InterruptedException { + int lineCnt = 0; + + for (IntWritable value : values) { + lineCnt += value.get(); + + X.println("___ rdcr: " + value.get()); + } + + sum.set(lineCnt); + + ctx.write(key, sum); + + X.println("___ RDCR SUM: " + lineCnt); + + totalLineCnt.addAndGet(lineCnt); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksAllVersionsTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksAllVersionsTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksAllVersionsTest.java new file mode 100644 index 0000000..265890d --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksAllVersionsTest.java @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import com.google.common.base.*; +import org.apache.hadoop.io.*; +import org.apache.ignite.*; +import org.apache.ignite.igfs.*; +import org.apache.ignite.internal.processors.hadoop.examples.*; +import org.apache.ignite.internal.processors.hadoop.v2.*; + +import java.io.*; +import java.net.*; +import java.util.*; + +/** + * Tests of Map, Combine and Reduce task executions of any version of hadoop API. + */ +abstract class HadoopTasksAllVersionsTest extends HadoopAbstractWordCountTest { + /** Empty hosts array. */ + private static final String[] HOSTS = new String[0]; + + /** + * Creates some grid hadoop job. Override this method to create tests for any job implementation. + * + * @param inFile Input file name for the job. + * @param outFile Output file name for the job. + * @return Hadoop job. + * @throws IOException If fails. + */ + public abstract HadoopV2Job getHadoopJob(String inFile, String outFile) throws Exception; + + /** + * @return prefix of reducer output file name. It's "part-" for v1 and "part-r-" for v2 API + */ + public abstract String getOutputFileNamePrefix(); + + /** + * Tests map task execution. + * + * @throws Exception If fails. + */ + @SuppressWarnings("ConstantConditions") + public void testMapTask() throws Exception { + IgfsPath inDir = new IgfsPath(PATH_INPUT); + + igfs.mkdirs(inDir); + + IgfsPath inFile = new IgfsPath(inDir, GridHadoopWordCount2.class.getSimpleName() + "-input"); + + URI inFileUri = URI.create(igfsScheme() + inFile.toString()); + + try (PrintWriter pw = new PrintWriter(igfs.create(inFile, true))) { + pw.println("hello0 world0"); + pw.println("world1 hello1"); + } + + HadoopFileBlock fileBlock1 = new HadoopFileBlock(HOSTS, inFileUri, 0, igfs.info(inFile).length() - 1); + + try (PrintWriter pw = new PrintWriter(igfs.append(inFile, false))) { + pw.println("hello2 world2"); + pw.println("world3 hello3"); + } + HadoopFileBlock fileBlock2 = new HadoopFileBlock(HOSTS, inFileUri, fileBlock1.length(), + igfs.info(inFile).length() - fileBlock1.length()); + + HadoopV2Job gridJob = getHadoopJob(igfsScheme() + inFile.toString(), igfsScheme() + PATH_OUTPUT); + + GridHadoopTaskInfo taskInfo = new GridHadoopTaskInfo(HadoopTaskType.MAP, gridJob.id(), 0, 0, fileBlock1); + + HadoopTestTaskContext ctx = new HadoopTestTaskContext(taskInfo, gridJob); + + ctx.mockOutput().clear(); + + ctx.run(); + + assertEquals("hello0,1; world0,1; world1,1; hello1,1", Joiner.on("; ").join(ctx.mockOutput())); + + ctx.mockOutput().clear(); + + ctx.taskInfo(new GridHadoopTaskInfo(HadoopTaskType.MAP, gridJob.id(), 0, 0, fileBlock2)); + + ctx.run(); + + assertEquals("hello2,1; world2,1; world3,1; hello3,1", Joiner.on("; ").join(ctx.mockOutput())); + } + + /** + * Generates input data for reduce-like operation into mock context input and runs the operation. + * + * @param gridJob Job is to create reduce task from. + * @param taskType Type of task - combine or reduce. + * @param taskNum Number of task in job. + * @param words Pairs of words and its counts. + * @return Context with mock output. + * @throws IgniteCheckedException If fails. + */ + private HadoopTestTaskContext runTaskWithInput(HadoopV2Job gridJob, HadoopTaskType taskType, + int taskNum, String... words) throws IgniteCheckedException { + GridHadoopTaskInfo taskInfo = new GridHadoopTaskInfo(taskType, gridJob.id(), taskNum, 0, null); + + HadoopTestTaskContext ctx = new HadoopTestTaskContext(taskInfo, gridJob); + + for (int i = 0; i < words.length; i+=2) { + List<IntWritable> valList = new ArrayList<>(); + + for (int j = 0; j < Integer.parseInt(words[i + 1]); j++) + valList.add(new IntWritable(1)); + + ctx.mockInput().put(new Text(words[i]), valList); + } + + ctx.run(); + + return ctx; + } + + /** + * Tests reduce task execution. + * + * @throws Exception If fails. + */ + public void testReduceTask() throws Exception { + HadoopV2Job gridJob = getHadoopJob(igfsScheme() + PATH_INPUT, igfsScheme() + PATH_OUTPUT); + + runTaskWithInput(gridJob, HadoopTaskType.REDUCE, 0, "word1", "5", "word2", "10"); + runTaskWithInput(gridJob, HadoopTaskType.REDUCE, 1, "word3", "7", "word4", "15"); + + assertEquals( + "word1\t5\n" + + "word2\t10\n", + readAndSortFile(PATH_OUTPUT + "/_temporary/0/task_00000000-0000-0000-0000-000000000000_0000_r_000000/" + + getOutputFileNamePrefix() + "00000") + ); + + assertEquals( + "word3\t7\n" + + "word4\t15\n", + readAndSortFile(PATH_OUTPUT + "/_temporary/0/task_00000000-0000-0000-0000-000000000000_0000_r_000001/" + + getOutputFileNamePrefix() + "00001") + ); + } + + /** + * Tests combine task execution. + * + * @throws Exception If fails. + */ + public void testCombinerTask() throws Exception { + HadoopV2Job gridJob = getHadoopJob("/", "/"); + + HadoopTestTaskContext ctx = + runTaskWithInput(gridJob, HadoopTaskType.COMBINE, 0, "word1", "5", "word2", "10"); + + assertEquals("word1,5; word2,10", Joiner.on("; ").join(ctx.mockOutput())); + + ctx = runTaskWithInput(gridJob, HadoopTaskType.COMBINE, 1, "word3", "7", "word4", "15"); + + assertEquals("word3,7; word4,15", Joiner.on("; ").join(ctx.mockOutput())); + } + + /** + * Runs chain of map-combine task on file block. + * + * @param fileBlock block of input file to be processed. + * @param gridJob Hadoop job implementation. + * @return Context of combine task with mock output. + * @throws IgniteCheckedException If fails. + */ + private HadoopTestTaskContext runMapCombineTask(HadoopFileBlock fileBlock, HadoopV2Job gridJob) + throws IgniteCheckedException { + GridHadoopTaskInfo taskInfo = new GridHadoopTaskInfo(HadoopTaskType.MAP, gridJob.id(), 0, 0, fileBlock); + + HadoopTestTaskContext mapCtx = new HadoopTestTaskContext(taskInfo, gridJob); + + mapCtx.run(); + + //Prepare input for combine + taskInfo = new GridHadoopTaskInfo(HadoopTaskType.COMBINE, gridJob.id(), 0, 0, null); + + HadoopTestTaskContext combineCtx = new HadoopTestTaskContext(taskInfo, gridJob); + + combineCtx.makeTreeOfWritables(mapCtx.mockOutput()); + + combineCtx.run(); + + return combineCtx; + } + + /** + * Tests all job in complex. + * Runs 2 chains of map-combine tasks and sends result into one reduce task. + * + * @throws Exception If fails. + */ + @SuppressWarnings("ConstantConditions") + public void testAllTasks() throws Exception { + IgfsPath inDir = new IgfsPath(PATH_INPUT); + + igfs.mkdirs(inDir); + + IgfsPath inFile = new IgfsPath(inDir, GridHadoopWordCount2.class.getSimpleName() + "-input"); + + URI inFileUri = URI.create(igfsScheme() + inFile.toString()); + + generateTestFile(inFile.toString(), "red", 100, "blue", 200, "green", 150, "yellow", 70); + + //Split file into two blocks + long fileLen = igfs.info(inFile).length(); + + Long l = fileLen / 2; + + HadoopFileBlock fileBlock1 = new HadoopFileBlock(HOSTS, inFileUri, 0, l); + HadoopFileBlock fileBlock2 = new HadoopFileBlock(HOSTS, inFileUri, l, fileLen - l); + + HadoopV2Job gridJob = getHadoopJob(inFileUri.toString(), igfsScheme() + PATH_OUTPUT); + + HadoopTestTaskContext combine1Ctx = runMapCombineTask(fileBlock1, gridJob); + + HadoopTestTaskContext combine2Ctx = runMapCombineTask(fileBlock2, gridJob); + + //Prepare input for combine + GridHadoopTaskInfo taskInfo = new GridHadoopTaskInfo(HadoopTaskType.REDUCE, gridJob.id(), 0, 0, null); + + HadoopTestTaskContext reduceCtx = new HadoopTestTaskContext(taskInfo, gridJob); + + reduceCtx.makeTreeOfWritables(combine1Ctx.mockOutput()); + reduceCtx.makeTreeOfWritables(combine2Ctx.mockOutput()); + + reduceCtx.run(); + + reduceCtx.taskInfo(new GridHadoopTaskInfo(HadoopTaskType.COMMIT, gridJob.id(), 0, 0, null)); + + reduceCtx.run(); + + assertEquals( + "blue\t200\n" + + "green\t150\n" + + "red\t100\n" + + "yellow\t70\n", + readAndSortFile(PATH_OUTPUT + "/" + getOutputFileNamePrefix() + "00000") + ); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java new file mode 100644 index 0000000..d932a8f --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.hadoop.mapred.*; +import org.apache.ignite.internal.processors.hadoop.examples.*; +import org.apache.ignite.internal.processors.hadoop.v2.*; + +import java.io.*; +import java.util.*; + +import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*; + +/** + * Tests of Map, Combine and Reduce task executions via running of job of hadoop API v1. + */ +public class HadoopTasksV1Test extends HadoopTasksAllVersionsTest { + /** + * Creates WordCount hadoop job for API v1. + * + * @param inFile Input file name for the job. + * @param outFile Output file name for the job. + * @return Hadoop job. + * @throws IOException If fails. + */ + @Override public HadoopV2Job getHadoopJob(String inFile, String outFile) throws Exception { + JobConf jobConf = GridHadoopWordCount1.getJob(inFile, outFile); + + setupFileSystems(jobConf); + + HadoopDefaultJobInfo jobInfo = createJobInfo(jobConf); + + GridHadoopJobId jobId = new GridHadoopJobId(new UUID(0, 0), 0); + + return new HadoopV2Job(jobId, jobInfo, log); + } + + /** {@inheritDoc} */ + @Override public String getOutputFileNamePrefix() { + return "part-"; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java new file mode 100644 index 0000000..2e1e9fd --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.hadoop.conf.*; +import org.apache.hadoop.fs.*; +import org.apache.hadoop.io.*; +import org.apache.hadoop.mapreduce.*; +import org.apache.hadoop.mapreduce.lib.input.*; +import org.apache.hadoop.mapreduce.lib.output.*; +import org.apache.ignite.internal.processors.hadoop.examples.*; +import org.apache.ignite.internal.processors.hadoop.v2.*; + +import java.util.*; + +import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*; + +/** + * Tests of Map, Combine and Reduce task executions via running of job of hadoop API v2. + */ +public class HadoopTasksV2Test extends HadoopTasksAllVersionsTest { + /** + * Creates WordCount hadoop job for API v2. + * + * @param inFile Input file name for the job. + * @param outFile Output file name for the job. + * @return Hadoop job. + * @throws Exception if fails. + */ + @Override public HadoopV2Job getHadoopJob(String inFile, String outFile) throws Exception { + Job job = Job.getInstance(); + + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(IntWritable.class); + + GridHadoopWordCount2.setTasksClasses(job, true, true, true); + + Configuration conf = job.getConfiguration(); + + setupFileSystems(conf); + + FileInputFormat.setInputPaths(job, new Path(inFile)); + FileOutputFormat.setOutputPath(job, new Path(outFile)); + + job.setJarByClass(GridHadoopWordCount2.class); + + Job hadoopJob = GridHadoopWordCount2.getJob(inFile, outFile); + + HadoopDefaultJobInfo jobInfo = createJobInfo(hadoopJob.getConfiguration()); + + GridHadoopJobId jobId = new GridHadoopJobId(new UUID(0, 0), 0); + + return new HadoopV2Job(jobId, jobInfo, log); + } + + /** {@inheritDoc} */ + @Override public String getOutputFileNamePrefix() { + return "part-r-"; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTestTaskContext.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTestTaskContext.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTestTaskContext.java index 9b56300..c3c8806 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTestTaskContext.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTestTaskContext.java @@ -79,7 +79,7 @@ class HadoopTestTaskContext extends HadoopV2TaskContext { private Map<Object,List> mockInput = new TreeMap<>(); /** Context output implementation to write data into mockOutput. */ - private GridHadoopTaskOutput output = new GridHadoopTaskOutput() { + private HadoopTaskOutput output = new HadoopTaskOutput() { /** {@inheritDoc} */ @Override public void write(Object key, Object val) { //Check of casting and extract/copy values @@ -96,7 +96,7 @@ class HadoopTestTaskContext extends HadoopV2TaskContext { }; /** Context input implementation to read data from mockInput. */ - private GridHadoopTaskInput input = new GridHadoopTaskInput() { + private HadoopTaskInput input = new HadoopTaskInput() { /** Iterator of keys and associated lists of values. */ Iterator<Map.Entry<Object, List>> iter; @@ -178,7 +178,7 @@ class HadoopTestTaskContext extends HadoopV2TaskContext { * @param taskInfo Task info. * @param gridJob Grid Hadoop job. */ - public HadoopTestTaskContext(GridHadoopTaskInfo taskInfo, GridHadoopJob gridJob) throws IgniteCheckedException { + public HadoopTestTaskContext(GridHadoopTaskInfo taskInfo, HadoopJob gridJob) throws IgniteCheckedException { super(taskInfo, gridJob, gridJob.id(), null, jobConfDataInput(gridJob)); } @@ -189,7 +189,7 @@ class HadoopTestTaskContext extends HadoopV2TaskContext { * @return DataInput with JobConf. * @throws IgniteCheckedException If failed. */ - private static DataInput jobConfDataInput(GridHadoopJob job) throws IgniteCheckedException { + private static DataInput jobConfDataInput(HadoopJob job) throws IgniteCheckedException { JobConf jobConf = new JobConf(); for (Map.Entry<String, String> e : ((HadoopDefaultJobInfo)job.info()).properties().entrySet()) @@ -208,12 +208,12 @@ class HadoopTestTaskContext extends HadoopV2TaskContext { } /** {@inheritDoc} */ - @Override public GridHadoopTaskOutput output() { + @Override public HadoopTaskOutput output() { return output; } /** {@inheritDoc} */ - @Override public GridHadoopTaskInput input() { + @Override public HadoopTaskInput input() { return input; } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java index 66e35b5..222ba17 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java @@ -32,7 +32,7 @@ import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*; /** * Self test of {@link org.apache.ignite.internal.processors.hadoop.v2.HadoopV2Job}. */ -public class HadoopV2JobSelfTest extends GridHadoopAbstractSelfTest { +public class HadoopV2JobSelfTest extends HadoopAbstractSelfTest { /** */ private static final String TEST_SERIALIZED_VALUE = "Test serialized value"; @@ -55,7 +55,7 @@ public class HadoopV2JobSelfTest extends GridHadoopAbstractSelfTest { } /** - * Tests that {@link GridHadoopJob} provides wrapped serializer if it's set in configuration. + * Tests that {@link HadoopJob} provides wrapped serializer if it's set in configuration. * * @throws IgniteCheckedException If fails. */ @@ -66,12 +66,12 @@ public class HadoopV2JobSelfTest extends GridHadoopAbstractSelfTest { cfg.setMapOutputValueClass(Text.class); cfg.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, CustomSerialization.class.getName()); - GridHadoopJob job = new HadoopV2Job(new GridHadoopJobId(UUID.randomUUID(), 1), createJobInfo(cfg), log); + HadoopJob job = new HadoopV2Job(new GridHadoopJobId(UUID.randomUUID(), 1), createJobInfo(cfg), log); - GridHadoopTaskContext taskCtx = job.getTaskContext(new GridHadoopTaskInfo(GridHadoopTaskType.MAP, null, 0, 0, + HadoopTaskContext taskCtx = job.getTaskContext(new GridHadoopTaskInfo(HadoopTaskType.MAP, null, 0, 0, null)); - GridHadoopSerialization ser = taskCtx.keySerialization(); + HadoopSerialization ser = taskCtx.keySerialization(); assertEquals(HadoopSerializationWrapper.class.getName(), ser.getClass().getName()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopValidationSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopValidationSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopValidationSelfTest.java new file mode 100644 index 0000000..558dec5 --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopValidationSelfTest.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.ignite.configuration.*; + +/** + * Configuration validation tests. + */ +public class HadoopValidationSelfTest extends HadoopAbstractSelfTest { + /** Peer class loading enabled flag. */ + public boolean peerClassLoading; + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(true); + + peerClassLoading = false; + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setPeerClassLoadingEnabled(peerClassLoading); + + return cfg; + } + + /** + * Ensure that Grid starts when all configuration parameters are valid. + * + * @throws Exception If failed. + */ + public void testValid() throws Exception { + startGrids(1); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopAbstractMapTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopAbstractMapTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopAbstractMapTest.java deleted file mode 100644 index aa0ddc1..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopAbstractMapTest.java +++ /dev/null @@ -1,152 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.shuffle.collections; - -import org.apache.commons.collections.comparators.*; -import org.apache.hadoop.io.*; -import org.apache.ignite.*; -import org.apache.ignite.internal.processors.hadoop.*; -import org.apache.ignite.internal.processors.hadoop.v2.*; -import org.apache.ignite.testframework.junits.common.*; -import org.jetbrains.annotations.*; - -import java.util.*; - -/** - * Abstract class for maps test. - */ -public abstract class GridHadoopAbstractMapTest extends GridCommonAbstractTest { - /** - * Test task context. - */ - protected static class TaskContext extends GridHadoopTaskContext { - /** - */ - protected TaskContext() { - super(null, null); - } - - /** {@inheritDoc} */ - @Override public <T extends GridHadoopCounter> T counter(String grp, String name, Class<T> cls) { - return null; - } - - /** {@inheritDoc} */ - @Override public GridHadoopCounters counters() { - return null; - } - - /** {@inheritDoc} */ - @Override public GridHadoopPartitioner partitioner() throws IgniteCheckedException { - assert false; - - return null; - } - - /** {@inheritDoc} */ - @Override public GridHadoopSerialization keySerialization() throws IgniteCheckedException { - return new HadoopWritableSerialization(IntWritable.class); - } - - /** {@inheritDoc} */ - @Override public GridHadoopSerialization valueSerialization() throws IgniteCheckedException { - return new HadoopWritableSerialization(IntWritable.class); - } - - /** {@inheritDoc} */ - @Override public Comparator<Object> sortComparator() { - return ComparableComparator.getInstance(); - } - - /** {@inheritDoc} */ - @Override public Comparator<Object> groupComparator() { - return ComparableComparator.getInstance(); - } - - /** {@inheritDoc} */ - @Override public void run() throws IgniteCheckedException { - assert false; - } - - /** {@inheritDoc} */ - @Override public void cancel() { - assert false; - } - - /** {@inheritDoc} */ - @Override public void prepareTaskEnvironment() throws IgniteCheckedException { - assert false; - } - - @Override public void cleanupTaskEnvironment() throws IgniteCheckedException { - assert false; - } - } - - /** - * Test job info. - */ - protected static class JobInfo implements GridHadoopJobInfo { - /** {@inheritDoc} */ - @Nullable @Override public String property(String name) { - return null; - } - - /** {@inheritDoc} */ - @Override public boolean hasCombiner() { - assert false; - - return false; - } - - /** {@inheritDoc} */ - @Override public boolean hasReducer() { - assert false; - - return false; - } - - /** {@inheritDoc} */ - @Override public GridHadoopJob createJob(GridHadoopJobId jobId, IgniteLogger log) throws IgniteCheckedException { - assert false; - - return null; - } - - /** {@inheritDoc} */ - @Override public int reducers() { - assert false; - - return 0; - } - - /** {@inheritDoc} */ - @Override public String jobName() { - assert false; - - return null; - } - - /** {@inheritDoc} */ - @Override public String user() { - assert false; - - return null; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopConcurrentHashMultimapSelftest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopConcurrentHashMultimapSelftest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopConcurrentHashMultimapSelftest.java index 8b5d2b6..43b1f02 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopConcurrentHashMultimapSelftest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopConcurrentHashMultimapSelftest.java @@ -34,7 +34,7 @@ import static org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory.*; /** * */ -public class GridHadoopConcurrentHashMultimapSelftest extends GridHadoopAbstractMapTest { +public class GridHadoopConcurrentHashMultimapSelftest extends HadoopAbstractMapTest { /** */ public void testMapSimple() throws Exception { GridUnsafeMemory mem = new GridUnsafeMemory(0); @@ -52,7 +52,7 @@ public class GridHadoopConcurrentHashMultimapSelftest extends GridHadoopAbstract GridHadoopJobInfo job = new JobInfo(); - GridHadoopTaskContext taskCtx = new TaskContext(); + HadoopTaskContext taskCtx = new TaskContext(); HadoopConcurrentHashMultimap m = new HadoopConcurrentHashMultimap(job, mem, mapSize); @@ -91,8 +91,8 @@ public class GridHadoopConcurrentHashMultimapSelftest extends GridHadoopAbstract } private void check(HadoopConcurrentHashMultimap m, Multimap<Integer, Integer> mm, - final Multimap<Integer, Integer> vis, GridHadoopTaskContext taskCtx) throws Exception { - final GridHadoopTaskInput in = m.input(taskCtx); + final Multimap<Integer, Integer> vis, HadoopTaskContext taskCtx) throws Exception { + final HadoopTaskInput in = m.input(taskCtx); Map<Integer, Collection<Integer>> mmm = mm.asMap(); @@ -182,7 +182,7 @@ public class GridHadoopConcurrentHashMultimapSelftest extends GridHadoopAbstract for (int i = 0; i < 20; i++) { GridHadoopJobInfo job = new JobInfo(); - final GridHadoopTaskContext taskCtx = new TaskContext(); + final HadoopTaskContext taskCtx = new TaskContext(); final HadoopConcurrentHashMultimap m = new HadoopConcurrentHashMultimap(job, mem, 16); @@ -238,7 +238,7 @@ public class GridHadoopConcurrentHashMultimapSelftest extends GridHadoopAbstract assertTrue(m.capacity() > 32000); - GridHadoopTaskInput in = m.input(taskCtx); + HadoopTaskInput in = m.input(taskCtx); while (in.next()) { IntWritable key = (IntWritable) in.key(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopHashMapSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopHashMapSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopHashMapSelfTest.java deleted file mode 100644 index 90d957b..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopHashMapSelfTest.java +++ /dev/null @@ -1,170 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.shuffle.collections; - -import com.google.common.collect.*; -import org.apache.hadoop.io.*; -import org.apache.ignite.internal.processors.hadoop.*; -import org.apache.ignite.internal.util.*; -import org.apache.ignite.internal.util.offheap.unsafe.*; -import org.apache.ignite.internal.util.typedef.*; - -import java.util.*; -import java.util.concurrent.*; - -/** - * - */ -public class GridHadoopHashMapSelfTest extends GridHadoopAbstractMapTest { - - public void _testAllocation() throws Exception { - final GridUnsafeMemory mem = new GridUnsafeMemory(0); - - long size = 3L * 1024 * 1024 * 1024; - - final long chunk = 16;// * 1024; - - final int page = 4 * 1024; - - final int writes = chunk < page ? 1 : (int)(chunk / page); - - final long cnt = size / chunk; - - assert cnt < Integer.MAX_VALUE; - - final int threads = 4; - - long start = System.currentTimeMillis(); - - multithreaded(new Callable<Object>() { - @Override public Object call() throws Exception { - int cnt0 = (int)(cnt / threads); - - for (int i = 0; i < cnt0; i++) { - long ptr = mem.allocate(chunk); - - for (int j = 0; j < writes; j++) - mem.writeInt(ptr + j * page, 100500); - } - - return null; - } - }, threads); - - X.println("End: " + (System.currentTimeMillis() - start) + " mem: " + mem.allocatedSize() + " cnt: " + cnt); - - Thread.sleep(30000); - } - - - /** */ - public void testMapSimple() throws Exception { - GridUnsafeMemory mem = new GridUnsafeMemory(0); - -// mem.listen(new GridOffHeapEventListener() { -// @Override public void onEvent(GridOffHeapEvent evt) { -// if (evt == GridOffHeapEvent.ALLOCATE) -// U.dumpStack(); -// } -// }); - - Random rnd = new Random(); - - int mapSize = 16 << rnd.nextInt(3); - - GridHadoopTaskContext taskCtx = new TaskContext(); - - final HadoopHashMultimap m = new HadoopHashMultimap(new JobInfo(), mem, mapSize); - - HadoopMultimap.Adder a = m.startAdding(taskCtx); - - Multimap<Integer, Integer> mm = ArrayListMultimap.create(); - - for (int i = 0, vals = 4 * mapSize + rnd.nextInt(25); i < vals; i++) { - int key = rnd.nextInt(mapSize); - int val = rnd.nextInt(); - - a.write(new IntWritable(key), new IntWritable(val)); - mm.put(key, val); - - X.println("k: " + key + " v: " + val); - - a.close(); - - check(m, mm, taskCtx); - - a = m.startAdding(taskCtx); - } - -// a.add(new IntWritable(10), new IntWritable(2)); -// mm.put(10, 2); -// check(m, mm); - - a.close(); - - X.println("Alloc: " + mem.allocatedSize()); - - m.close(); - - assertEquals(0, mem.allocatedSize()); - } - - private void check(HadoopHashMultimap m, Multimap<Integer, Integer> mm, GridHadoopTaskContext taskCtx) throws Exception { - final GridHadoopTaskInput in = m.input(taskCtx); - - Map<Integer, Collection<Integer>> mmm = mm.asMap(); - - int keys = 0; - - while (in.next()) { - keys++; - - IntWritable k = (IntWritable)in.key(); - - assertNotNull(k); - - ArrayList<Integer> vs = new ArrayList<>(); - - Iterator<?> it = in.values(); - - while (it.hasNext()) - vs.add(((IntWritable) it.next()).get()); - - Collection<Integer> exp = mmm.get(k.get()); - - assertEquals(sorted(exp), sorted(vs)); - } - - X.println("keys: " + keys + " cap: " + m.capacity()); - - assertEquals(mmm.size(), keys); - - assertEquals(m.keys(), keys); - - in.close(); - } - - private GridLongList sorted(Collection<Integer> col) { - GridLongList lst = new GridLongList(col.size()); - - for (Integer i : col) - lst.add(i); - - return lst.sort(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopSkipListSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopSkipListSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopSkipListSelfTest.java deleted file mode 100644 index 3d930ff..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopSkipListSelfTest.java +++ /dev/null @@ -1,303 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.shuffle.collections; - -import com.google.common.collect.*; -import org.apache.hadoop.io.*; -import org.apache.ignite.internal.processors.hadoop.*; -import org.apache.ignite.internal.util.*; -import org.apache.ignite.internal.util.io.*; -import org.apache.ignite.internal.util.offheap.unsafe.*; -import org.apache.ignite.internal.util.typedef.*; - -import java.io.*; -import java.util.*; -import java.util.concurrent.*; - -import static java.lang.Math.*; -import static org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory.*; - -/** - * Skip list tests. - */ -public class GridHadoopSkipListSelfTest extends GridHadoopAbstractMapTest { - /** - * - */ - public void testLevel() { - Random rnd = new GridRandom(); - - int[] levelsCnts = new int[32]; - - int all = 10000; - - for (int i = 0; i < all; i++) { - int level = HadoopSkipList.randomLevel(rnd); - - levelsCnts[level]++; - } - - X.println("Distribution: " + Arrays.toString(levelsCnts)); - - for (int level = 0; level < levelsCnts.length; level++) { - int exp = (level + 1) == levelsCnts.length ? 0 : all >>> (level + 1); - - double precission = 0.72 / Math.max(32 >>> level, 1); - - int sigma = max((int)ceil(precission * exp), 5); - - X.println("Level: " + level + " exp: " + exp + " act: " + levelsCnts[level] + " precission: " + precission + - " sigma: " + sigma); - - assertTrue(abs(exp - levelsCnts[level]) <= sigma); - } - } - - public void testMapSimple() throws Exception { - GridUnsafeMemory mem = new GridUnsafeMemory(0); - -// mem.listen(new GridOffHeapEventListener() { -// @Override public void onEvent(GridOffHeapEvent evt) { -// if (evt == GridOffHeapEvent.ALLOCATE) -// U.dumpStack(); -// } -// }); - - Random rnd = new Random(); - - int mapSize = 16 << rnd.nextInt(6); - - GridHadoopJobInfo job = new JobInfo(); - - GridHadoopTaskContext taskCtx = new TaskContext(); - - HadoopMultimap m = new HadoopSkipList(job, mem); - - HadoopConcurrentHashMultimap.Adder a = m.startAdding(taskCtx); - - Multimap<Integer, Integer> mm = ArrayListMultimap.create(); - Multimap<Integer, Integer> vis = ArrayListMultimap.create(); - - for (int i = 0, vals = 4 * mapSize + rnd.nextInt(25); i < vals; i++) { - int key = rnd.nextInt(mapSize); - int val = rnd.nextInt(); - - a.write(new IntWritable(key), new IntWritable(val)); - mm.put(key, val); - - X.println("k: " + key + " v: " + val); - - a.close(); - - check(m, mm, vis, taskCtx); - - a = m.startAdding(taskCtx); - } - -// a.add(new IntWritable(10), new IntWritable(2)); -// mm.put(10, 2); -// check(m, mm); - - a.close(); - - X.println("Alloc: " + mem.allocatedSize()); - - m.close(); - - assertEquals(0, mem.allocatedSize()); - } - - private void check(HadoopMultimap m, Multimap<Integer, Integer> mm, final Multimap<Integer, Integer> vis, GridHadoopTaskContext taskCtx) - throws Exception { - final GridHadoopTaskInput in = m.input(taskCtx); - - Map<Integer, Collection<Integer>> mmm = mm.asMap(); - - int keys = 0; - - int prevKey = Integer.MIN_VALUE; - - while (in.next()) { - keys++; - - IntWritable k = (IntWritable)in.key(); - - assertNotNull(k); - - assertTrue(k.get() > prevKey); - - prevKey = k.get(); - - Deque<Integer> vs = new LinkedList<>(); - - Iterator<?> it = in.values(); - - while (it.hasNext()) - vs.addFirst(((IntWritable) it.next()).get()); - - Collection<Integer> exp = mmm.get(k.get()); - - assertEquals(exp, vs); - } - - assertEquals(mmm.size(), keys); - -//! assertEquals(m.keys(), keys); - - // Check visitor. - - final byte[] buf = new byte[4]; - - final GridDataInput dataInput = new GridUnsafeDataInput(); - - m.visit(false, new HadoopConcurrentHashMultimap.Visitor() { - /** */ - IntWritable key = new IntWritable(); - - /** */ - IntWritable val = new IntWritable(); - - @Override public void onKey(long keyPtr, int keySize) { - read(keyPtr, keySize, key); - } - - @Override public void onValue(long valPtr, int valSize) { - read(valPtr, valSize, val); - - vis.put(key.get(), val.get()); - } - - private void read(long ptr, int size, Writable w) { - assert size == 4 : size; - - UNSAFE.copyMemory(null, ptr, buf, BYTE_ARR_OFF, size); - - dataInput.bytes(buf, size); - - try { - w.readFields(dataInput); - } - catch (IOException e) { - throw new RuntimeException(e); - } - } - }); - -// X.println("vis: " + vis); - - assertEquals(mm, vis); - - in.close(); - } - - /** - * @throws Exception if failed. - */ - public void testMultiThreaded() throws Exception { - GridUnsafeMemory mem = new GridUnsafeMemory(0); - - X.println("___ Started"); - - Random rnd = new GridRandom(); - - for (int i = 0; i < 20; i++) { - GridHadoopJobInfo job = new JobInfo(); - - final GridHadoopTaskContext taskCtx = new TaskContext(); - - final HadoopMultimap m = new HadoopSkipList(job, mem); - - final ConcurrentMap<Integer, Collection<Integer>> mm = new ConcurrentHashMap<>(); - - X.println("___ MT"); - - multithreaded(new Callable<Object>() { - @Override public Object call() throws Exception { - X.println("___ TH in"); - - Random rnd = new GridRandom(); - - IntWritable key = new IntWritable(); - IntWritable val = new IntWritable(); - - HadoopMultimap.Adder a = m.startAdding(taskCtx); - - for (int i = 0; i < 50000; i++) { - int k = rnd.nextInt(32000); - int v = rnd.nextInt(); - - key.set(k); - val.set(v); - - a.write(key, val); - - Collection<Integer> list = mm.get(k); - - if (list == null) { - list = new ConcurrentLinkedQueue<>(); - - Collection<Integer> old = mm.putIfAbsent(k, list); - - if (old != null) - list = old; - } - - list.add(v); - } - - a.close(); - - X.println("___ TH out"); - - return null; - } - }, 3 + rnd.nextInt(27)); - - GridHadoopTaskInput in = m.input(taskCtx); - - int prevKey = Integer.MIN_VALUE; - - while (in.next()) { - IntWritable key = (IntWritable)in.key(); - - assertTrue(key.get() > prevKey); - - prevKey = key.get(); - - Iterator<?> valsIter = in.values(); - - Collection<Integer> vals = mm.remove(key.get()); - - assertNotNull(vals); - - while (valsIter.hasNext()) { - IntWritable val = (IntWritable) valsIter.next(); - - assertTrue(vals.remove(val.get())); - } - - assertTrue(vals.isEmpty()); - } - - in.close(); - m.close(); - - assertEquals(0, mem.allocatedSize()); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java new file mode 100644 index 0000000..e98e82c --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.shuffle.collections; + +import org.apache.commons.collections.comparators.*; +import org.apache.hadoop.io.*; +import org.apache.ignite.*; +import org.apache.ignite.internal.processors.hadoop.*; +import org.apache.ignite.internal.processors.hadoop.v2.*; +import org.apache.ignite.testframework.junits.common.*; +import org.jetbrains.annotations.*; + +import java.util.*; + +/** + * Abstract class for maps test. + */ +public abstract class HadoopAbstractMapTest extends GridCommonAbstractTest { + /** + * Test task context. + */ + protected static class TaskContext extends HadoopTaskContext { + /** + */ + protected TaskContext() { + super(null, null); + } + + /** {@inheritDoc} */ + @Override public <T extends HadoopCounter> T counter(String grp, String name, Class<T> cls) { + return null; + } + + /** {@inheritDoc} */ + @Override public GridHadoopCounters counters() { + return null; + } + + /** {@inheritDoc} */ + @Override public HadoopPartitioner partitioner() throws IgniteCheckedException { + assert false; + + return null; + } + + /** {@inheritDoc} */ + @Override public HadoopSerialization keySerialization() throws IgniteCheckedException { + return new HadoopWritableSerialization(IntWritable.class); + } + + /** {@inheritDoc} */ + @Override public HadoopSerialization valueSerialization() throws IgniteCheckedException { + return new HadoopWritableSerialization(IntWritable.class); + } + + /** {@inheritDoc} */ + @Override public Comparator<Object> sortComparator() { + return ComparableComparator.getInstance(); + } + + /** {@inheritDoc} */ + @Override public Comparator<Object> groupComparator() { + return ComparableComparator.getInstance(); + } + + /** {@inheritDoc} */ + @Override public void run() throws IgniteCheckedException { + assert false; + } + + /** {@inheritDoc} */ + @Override public void cancel() { + assert false; + } + + /** {@inheritDoc} */ + @Override public void prepareTaskEnvironment() throws IgniteCheckedException { + assert false; + } + + @Override public void cleanupTaskEnvironment() throws IgniteCheckedException { + assert false; + } + } + + /** + * Test job info. + */ + protected static class JobInfo implements GridHadoopJobInfo { + /** {@inheritDoc} */ + @Nullable @Override public String property(String name) { + return null; + } + + /** {@inheritDoc} */ + @Override public boolean hasCombiner() { + assert false; + + return false; + } + + /** {@inheritDoc} */ + @Override public boolean hasReducer() { + assert false; + + return false; + } + + /** {@inheritDoc} */ + @Override public HadoopJob createJob(GridHadoopJobId jobId, IgniteLogger log) throws IgniteCheckedException { + assert false; + + return null; + } + + /** {@inheritDoc} */ + @Override public int reducers() { + assert false; + + return 0; + } + + /** {@inheritDoc} */ + @Override public String jobName() { + assert false; + + return null; + } + + /** {@inheritDoc} */ + @Override public String user() { + assert false; + + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopHashMapSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopHashMapSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopHashMapSelfTest.java new file mode 100644 index 0000000..5b1b6a8 --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopHashMapSelfTest.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.shuffle.collections; + +import com.google.common.collect.*; +import org.apache.hadoop.io.*; +import org.apache.ignite.internal.processors.hadoop.*; +import org.apache.ignite.internal.util.*; +import org.apache.ignite.internal.util.offheap.unsafe.*; +import org.apache.ignite.internal.util.typedef.*; + +import java.util.*; +import java.util.concurrent.*; + +/** + * + */ +public class HadoopHashMapSelfTest extends HadoopAbstractMapTest { + + public void _testAllocation() throws Exception { + final GridUnsafeMemory mem = new GridUnsafeMemory(0); + + long size = 3L * 1024 * 1024 * 1024; + + final long chunk = 16;// * 1024; + + final int page = 4 * 1024; + + final int writes = chunk < page ? 1 : (int)(chunk / page); + + final long cnt = size / chunk; + + assert cnt < Integer.MAX_VALUE; + + final int threads = 4; + + long start = System.currentTimeMillis(); + + multithreaded(new Callable<Object>() { + @Override public Object call() throws Exception { + int cnt0 = (int)(cnt / threads); + + for (int i = 0; i < cnt0; i++) { + long ptr = mem.allocate(chunk); + + for (int j = 0; j < writes; j++) + mem.writeInt(ptr + j * page, 100500); + } + + return null; + } + }, threads); + + X.println("End: " + (System.currentTimeMillis() - start) + " mem: " + mem.allocatedSize() + " cnt: " + cnt); + + Thread.sleep(30000); + } + + + /** */ + public void testMapSimple() throws Exception { + GridUnsafeMemory mem = new GridUnsafeMemory(0); + +// mem.listen(new GridOffHeapEventListener() { +// @Override public void onEvent(GridOffHeapEvent evt) { +// if (evt == GridOffHeapEvent.ALLOCATE) +// U.dumpStack(); +// } +// }); + + Random rnd = new Random(); + + int mapSize = 16 << rnd.nextInt(3); + + HadoopTaskContext taskCtx = new TaskContext(); + + final HadoopHashMultimap m = new HadoopHashMultimap(new JobInfo(), mem, mapSize); + + HadoopMultimap.Adder a = m.startAdding(taskCtx); + + Multimap<Integer, Integer> mm = ArrayListMultimap.create(); + + for (int i = 0, vals = 4 * mapSize + rnd.nextInt(25); i < vals; i++) { + int key = rnd.nextInt(mapSize); + int val = rnd.nextInt(); + + a.write(new IntWritable(key), new IntWritable(val)); + mm.put(key, val); + + X.println("k: " + key + " v: " + val); + + a.close(); + + check(m, mm, taskCtx); + + a = m.startAdding(taskCtx); + } + +// a.add(new IntWritable(10), new IntWritable(2)); +// mm.put(10, 2); +// check(m, mm); + + a.close(); + + X.println("Alloc: " + mem.allocatedSize()); + + m.close(); + + assertEquals(0, mem.allocatedSize()); + } + + private void check(HadoopHashMultimap m, Multimap<Integer, Integer> mm, HadoopTaskContext taskCtx) throws Exception { + final HadoopTaskInput in = m.input(taskCtx); + + Map<Integer, Collection<Integer>> mmm = mm.asMap(); + + int keys = 0; + + while (in.next()) { + keys++; + + IntWritable k = (IntWritable)in.key(); + + assertNotNull(k); + + ArrayList<Integer> vs = new ArrayList<>(); + + Iterator<?> it = in.values(); + + while (it.hasNext()) + vs.add(((IntWritable) it.next()).get()); + + Collection<Integer> exp = mmm.get(k.get()); + + assertEquals(sorted(exp), sorted(vs)); + } + + X.println("keys: " + keys + " cap: " + m.capacity()); + + assertEquals(mmm.size(), keys); + + assertEquals(m.keys(), keys); + + in.close(); + } + + private GridLongList sorted(Collection<Integer> col) { + GridLongList lst = new GridLongList(col.size()); + + for (Integer i : col) + lst.add(i); + + return lst.sort(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipListSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipListSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipListSelfTest.java new file mode 100644 index 0000000..52512cf --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipListSelfTest.java @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.shuffle.collections; + +import com.google.common.collect.*; +import org.apache.hadoop.io.*; +import org.apache.ignite.internal.processors.hadoop.*; +import org.apache.ignite.internal.util.*; +import org.apache.ignite.internal.util.io.*; +import org.apache.ignite.internal.util.offheap.unsafe.*; +import org.apache.ignite.internal.util.typedef.*; + +import java.io.*; +import java.util.*; +import java.util.concurrent.*; + +import static java.lang.Math.*; +import static org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory.*; + +/** + * Skip list tests. + */ +public class HadoopSkipListSelfTest extends HadoopAbstractMapTest { + /** + * + */ + public void testLevel() { + Random rnd = new GridRandom(); + + int[] levelsCnts = new int[32]; + + int all = 10000; + + for (int i = 0; i < all; i++) { + int level = HadoopSkipList.randomLevel(rnd); + + levelsCnts[level]++; + } + + X.println("Distribution: " + Arrays.toString(levelsCnts)); + + for (int level = 0; level < levelsCnts.length; level++) { + int exp = (level + 1) == levelsCnts.length ? 0 : all >>> (level + 1); + + double precission = 0.72 / Math.max(32 >>> level, 1); + + int sigma = max((int)ceil(precission * exp), 5); + + X.println("Level: " + level + " exp: " + exp + " act: " + levelsCnts[level] + " precission: " + precission + + " sigma: " + sigma); + + assertTrue(abs(exp - levelsCnts[level]) <= sigma); + } + } + + public void testMapSimple() throws Exception { + GridUnsafeMemory mem = new GridUnsafeMemory(0); + +// mem.listen(new GridOffHeapEventListener() { +// @Override public void onEvent(GridOffHeapEvent evt) { +// if (evt == GridOffHeapEvent.ALLOCATE) +// U.dumpStack(); +// } +// }); + + Random rnd = new Random(); + + int mapSize = 16 << rnd.nextInt(6); + + GridHadoopJobInfo job = new JobInfo(); + + HadoopTaskContext taskCtx = new TaskContext(); + + HadoopMultimap m = new HadoopSkipList(job, mem); + + HadoopConcurrentHashMultimap.Adder a = m.startAdding(taskCtx); + + Multimap<Integer, Integer> mm = ArrayListMultimap.create(); + Multimap<Integer, Integer> vis = ArrayListMultimap.create(); + + for (int i = 0, vals = 4 * mapSize + rnd.nextInt(25); i < vals; i++) { + int key = rnd.nextInt(mapSize); + int val = rnd.nextInt(); + + a.write(new IntWritable(key), new IntWritable(val)); + mm.put(key, val); + + X.println("k: " + key + " v: " + val); + + a.close(); + + check(m, mm, vis, taskCtx); + + a = m.startAdding(taskCtx); + } + +// a.add(new IntWritable(10), new IntWritable(2)); +// mm.put(10, 2); +// check(m, mm); + + a.close(); + + X.println("Alloc: " + mem.allocatedSize()); + + m.close(); + + assertEquals(0, mem.allocatedSize()); + } + + private void check(HadoopMultimap m, Multimap<Integer, Integer> mm, final Multimap<Integer, Integer> vis, HadoopTaskContext taskCtx) + throws Exception { + final HadoopTaskInput in = m.input(taskCtx); + + Map<Integer, Collection<Integer>> mmm = mm.asMap(); + + int keys = 0; + + int prevKey = Integer.MIN_VALUE; + + while (in.next()) { + keys++; + + IntWritable k = (IntWritable)in.key(); + + assertNotNull(k); + + assertTrue(k.get() > prevKey); + + prevKey = k.get(); + + Deque<Integer> vs = new LinkedList<>(); + + Iterator<?> it = in.values(); + + while (it.hasNext()) + vs.addFirst(((IntWritable) it.next()).get()); + + Collection<Integer> exp = mmm.get(k.get()); + + assertEquals(exp, vs); + } + + assertEquals(mmm.size(), keys); + +//! assertEquals(m.keys(), keys); + + // Check visitor. + + final byte[] buf = new byte[4]; + + final GridDataInput dataInput = new GridUnsafeDataInput(); + + m.visit(false, new HadoopConcurrentHashMultimap.Visitor() { + /** */ + IntWritable key = new IntWritable(); + + /** */ + IntWritable val = new IntWritable(); + + @Override public void onKey(long keyPtr, int keySize) { + read(keyPtr, keySize, key); + } + + @Override public void onValue(long valPtr, int valSize) { + read(valPtr, valSize, val); + + vis.put(key.get(), val.get()); + } + + private void read(long ptr, int size, Writable w) { + assert size == 4 : size; + + UNSAFE.copyMemory(null, ptr, buf, BYTE_ARR_OFF, size); + + dataInput.bytes(buf, size); + + try { + w.readFields(dataInput); + } + catch (IOException e) { + throw new RuntimeException(e); + } + } + }); + +// X.println("vis: " + vis); + + assertEquals(mm, vis); + + in.close(); + } + + /** + * @throws Exception if failed. + */ + public void testMultiThreaded() throws Exception { + GridUnsafeMemory mem = new GridUnsafeMemory(0); + + X.println("___ Started"); + + Random rnd = new GridRandom(); + + for (int i = 0; i < 20; i++) { + GridHadoopJobInfo job = new JobInfo(); + + final HadoopTaskContext taskCtx = new TaskContext(); + + final HadoopMultimap m = new HadoopSkipList(job, mem); + + final ConcurrentMap<Integer, Collection<Integer>> mm = new ConcurrentHashMap<>(); + + X.println("___ MT"); + + multithreaded(new Callable<Object>() { + @Override public Object call() throws Exception { + X.println("___ TH in"); + + Random rnd = new GridRandom(); + + IntWritable key = new IntWritable(); + IntWritable val = new IntWritable(); + + HadoopMultimap.Adder a = m.startAdding(taskCtx); + + for (int i = 0; i < 50000; i++) { + int k = rnd.nextInt(32000); + int v = rnd.nextInt(); + + key.set(k); + val.set(v); + + a.write(key, val); + + Collection<Integer> list = mm.get(k); + + if (list == null) { + list = new ConcurrentLinkedQueue<>(); + + Collection<Integer> old = mm.putIfAbsent(k, list); + + if (old != null) + list = old; + } + + list.add(v); + } + + a.close(); + + X.println("___ TH out"); + + return null; + } + }, 3 + rnd.nextInt(27)); + + HadoopTaskInput in = m.input(taskCtx); + + int prevKey = Integer.MIN_VALUE; + + while (in.next()) { + IntWritable key = (IntWritable)in.key(); + + assertTrue(key.get() > prevKey); + + prevKey = key.get(); + + Iterator<?> valsIter = in.values(); + + Collection<Integer> vals = mm.remove(key.get()); + + assertNotNull(vals); + + while (valsIter.hasNext()) { + IntWritable val = (IntWritable) valsIter.next(); + + assertTrue(vals.remove(val.get())); + } + + assertTrue(vals.isEmpty()); + } + + in.close(); + m.close(); + + assertEquals(0, mem.allocatedSize()); + } + } +}