http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopValidationSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopValidationSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopValidationSelfTest.java new file mode 100644 index 0000000..558dec5 --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopValidationSelfTest.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.ignite.configuration.*; + +/** + * Configuration validation tests. + */ +public class HadoopValidationSelfTest extends HadoopAbstractSelfTest { + /** Peer class loading enabled flag. */ + public boolean peerClassLoading; + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(true); + + peerClassLoading = false; + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setPeerClassLoadingEnabled(peerClassLoading); + + return cfg; + } + + /** + * Ensure that Grid starts when all configuration parameters are valid. + * + * @throws Exception If failed. + */ + public void testValid() throws Exception { + startGrids(1); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/GridHadoopWordCount1.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/GridHadoopWordCount1.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/GridHadoopWordCount1.java deleted file mode 100644 index 40cf636..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/GridHadoopWordCount1.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.examples; - -import org.apache.hadoop.fs.*; -import org.apache.hadoop.io.*; -import org.apache.hadoop.mapred.*; - -/** - * Example job for testing hadoop task execution. - */ -public class GridHadoopWordCount1 { - /** - * Entry point to start job. - * @param args command line parameters. - * @throws Exception if fails. - */ - public static void main(String[] args) throws Exception { - if (args.length != 2) { - System.out.println("usage: [input] [output]"); - System.exit(-1); - } - - JobConf job = getJob(args[0], args[1]); - - JobClient.runJob(job); - } - - /** - * Gets fully configured JobConf instance. - * - * @param input input file name. - * @param output output directory name. - * @return Job configuration - */ - public static JobConf getJob(String input, String output) { - JobConf conf = new JobConf(GridHadoopWordCount1.class); - conf.setJobName("wordcount"); - - conf.setOutputKeyClass(Text.class); - conf.setOutputValueClass(IntWritable.class); - - setTasksClasses(conf, true, true, true); - - FileInputFormat.setInputPaths(conf, new Path(input)); - FileOutputFormat.setOutputPath(conf, new Path(output)); - - return conf; - } - - /** - * Sets task classes with related info if needed into configuration object. - * - * @param jobConf Configuration to change. - * @param setMapper Option to set mapper and input format classes. - * @param setCombiner Option to set combiner class. - * @param setReducer Option to set reducer and output format classes. - */ - public static void setTasksClasses(JobConf jobConf, boolean setMapper, boolean setCombiner, boolean setReducer) { - if (setMapper) { - jobConf.setMapperClass(GridHadoopWordCount1Map.class); - jobConf.setInputFormat(TextInputFormat.class); - } - - if (setCombiner) - jobConf.setCombinerClass(GridHadoopWordCount1Reduce.class); - - if (setReducer) { - jobConf.setReducerClass(GridHadoopWordCount1Reduce.class); - jobConf.setOutputFormat(TextOutputFormat.class); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/GridHadoopWordCount1Map.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/GridHadoopWordCount1Map.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/GridHadoopWordCount1Map.java deleted file mode 100644 index 5d8e0cc..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/GridHadoopWordCount1Map.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.examples; - -import org.apache.hadoop.io.*; -import org.apache.hadoop.mapred.*; - -import java.io.*; -import java.util.*; - -/** - * Mapper phase of WordCount job. - */ -public class GridHadoopWordCount1Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> { - /** Writable integer constant of '1' is writing as count of found words. */ - private static final IntWritable one = new IntWritable(1); - - /** Writable container for writing word. */ - private Text word = new Text(); - - /** Flag is to check that mapper was configured before run. */ - private boolean wasConfigured; - - /** {@inheritDoc} */ - @Override public void map(LongWritable key, Text val, OutputCollector<Text, IntWritable> output, Reporter reporter) - throws IOException { - - assert wasConfigured : "Mapper should be configured"; - - String line = val.toString(); - - StringTokenizer tokenizer = new StringTokenizer(line); - - while (tokenizer.hasMoreTokens()) { - word.set(tokenizer.nextToken()); - - output.collect(word, one); - } - } - - /** {@inheritDoc} */ - @Override public void configure(JobConf job) { - super.configure(job); - - wasConfigured = true; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/GridHadoopWordCount1Reduce.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/GridHadoopWordCount1Reduce.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/GridHadoopWordCount1Reduce.java deleted file mode 100644 index 1b69a43..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/GridHadoopWordCount1Reduce.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.examples; - -import org.apache.hadoop.io.*; -import org.apache.hadoop.mapred.*; - -import java.io.*; -import java.util.*; - -/** - * Combiner and Reducer phase of WordCount job. - */ -public class GridHadoopWordCount1Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { - /** Flag is to check that mapper was configured before run. */ - private boolean wasConfigured; - - /** {@inheritDoc} */ - @Override public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) - throws IOException { - assert wasConfigured : "Reducer should be configured"; - - int sum = 0; - - while (values.hasNext()) - sum += values.next().get(); - - output.collect(key, new IntWritable(sum)); - } - - @Override public void configure(JobConf job) { - super.configure(job); - - wasConfigured = true; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/GridHadoopWordCount2.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/GridHadoopWordCount2.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/GridHadoopWordCount2.java deleted file mode 100644 index 6310363..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/GridHadoopWordCount2.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.examples; - -import org.apache.hadoop.fs.*; -import org.apache.hadoop.io.*; -import org.apache.hadoop.mapreduce.*; -import org.apache.hadoop.mapreduce.lib.input.*; -import org.apache.hadoop.mapreduce.lib.output.*; - -import java.io.*; - -/** - * Example job for testing hadoop task execution. - */ -public class GridHadoopWordCount2 { - /** - * Entry point to start job. - * - * @param args Command line parameters. - * @throws Exception If fails. - */ - public static void main(String[] args) throws Exception { - if (args.length != 2) { - System.out.println("usage: [input] [output]"); - System.exit(-1); - } - - Job job = getJob(args[0], args[1]); - - job.submit(); - } - - /** - * Gets fully configured Job instance. - * - * @param input Input file name. - * @param output Output directory name. - * @return Job instance. - * @throws IOException If fails. - */ - public static Job getJob(String input, String output) throws IOException { - Job job = Job.getInstance(); - - job.setOutputKeyClass(Text.class); - job.setOutputValueClass(IntWritable.class); - - setTasksClasses(job, true, true, true); - - FileInputFormat.setInputPaths(job, new Path(input)); - FileOutputFormat.setOutputPath(job, new Path(output)); - - job.setJarByClass(GridHadoopWordCount2.class); - - return job; - } - - /** - * Sets task classes with related info if needed into configuration object. - * - * @param job Configuration to change. - * @param setMapper Option to set mapper and input format classes. - * @param setCombiner Option to set combiner class. - * @param setReducer Option to set reducer and output format classes. - */ - public static void setTasksClasses(Job job, boolean setMapper, boolean setCombiner, boolean setReducer) { - if (setMapper) { - job.setMapperClass(GridHadoopWordCount2Mapper.class); - job.setInputFormatClass(TextInputFormat.class); - } - - if (setCombiner) - job.setCombinerClass(GridHadoopWordCount2Reducer.class); - - if (setReducer) { - job.setReducerClass(GridHadoopWordCount2Reducer.class); - job.setOutputFormatClass(TextOutputFormat.class); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/GridHadoopWordCount2Mapper.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/GridHadoopWordCount2Mapper.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/GridHadoopWordCount2Mapper.java deleted file mode 100644 index 849928a..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/GridHadoopWordCount2Mapper.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.examples; - -import org.apache.hadoop.conf.*; -import org.apache.hadoop.io.*; -import org.apache.hadoop.mapreduce.*; - -import java.io.*; -import java.util.*; - -/** - * Mapper phase of WordCount job. - */ -public class GridHadoopWordCount2Mapper extends Mapper<Object, Text, Text, IntWritable> implements Configurable { - /** Writable container for writing word. */ - private Text word = new Text(); - - /** Writable integer constant of '1' is writing as count of found words. */ - private static final IntWritable one = new IntWritable(1); - - /** Flag is to check that mapper was configured before run. */ - private boolean wasConfigured; - - /** Flag is to check that mapper was set up before run. */ - private boolean wasSetUp; - - /** {@inheritDoc} */ - @Override public void map(Object key, Text val, Context ctx) throws IOException, InterruptedException { - assert wasConfigured : "Mapper should be configured"; - assert wasSetUp : "Mapper should be set up"; - - StringTokenizer wordList = new StringTokenizer(val.toString()); - - while (wordList.hasMoreTokens()) { - word.set(wordList.nextToken()); - - ctx.write(word, one); - } - } - - /** {@inheritDoc} */ - @Override protected void setup(Context context) throws IOException, InterruptedException { - super.setup(context); - wasSetUp = true; - } - - /** {@inheritDoc} */ - @Override public void setConf(Configuration conf) { - wasConfigured = true; - } - - /** {@inheritDoc} */ - @Override public Configuration getConf() { - return null; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/GridHadoopWordCount2Reducer.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/GridHadoopWordCount2Reducer.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/GridHadoopWordCount2Reducer.java deleted file mode 100644 index 922bb2f..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/GridHadoopWordCount2Reducer.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.examples; - -import org.apache.hadoop.conf.*; -import org.apache.hadoop.io.*; -import org.apache.hadoop.mapreduce.*; - -import java.io.*; - -/** - * Combiner and Reducer phase of WordCount job. - */ -public class GridHadoopWordCount2Reducer extends Reducer<Text, IntWritable, Text, IntWritable> implements Configurable { - /** Writable container for writing sum of word counts. */ - private IntWritable totalWordCnt = new IntWritable(); - - /** Flag is to check that mapper was configured before run. */ - private boolean wasConfigured; - - /** Flag is to check that mapper was set up before run. */ - private boolean wasSetUp; - - /** {@inheritDoc} */ - @Override public void reduce(Text key, Iterable<IntWritable> values, Context ctx) throws IOException, InterruptedException { - assert wasConfigured : "Reducer should be configured"; - assert wasSetUp : "Reducer should be set up"; - - int wordCnt = 0; - - for (IntWritable value : values) - wordCnt += value.get(); - - totalWordCnt.set(wordCnt); - - ctx.write(key, totalWordCnt); - } - - /** {@inheritDoc} */ - @Override protected void setup(Context context) throws IOException, InterruptedException { - super.setup(context); - wasSetUp = true; - } - - /** {@inheritDoc} */ - @Override public void setConf(Configuration conf) { - wasConfigured = true; - } - - /** {@inheritDoc} */ - @Override public Configuration getConf() { - return null; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount1.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount1.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount1.java new file mode 100644 index 0000000..dd9058d --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount1.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.examples; + +import org.apache.hadoop.fs.*; +import org.apache.hadoop.io.*; +import org.apache.hadoop.mapred.*; + +/** + * Example job for testing hadoop task execution. + */ +public class HadoopWordCount1 { + /** + * Entry point to start job. + * @param args command line parameters. + * @throws Exception if fails. + */ + public static void main(String[] args) throws Exception { + if (args.length != 2) { + System.out.println("usage: [input] [output]"); + System.exit(-1); + } + + JobConf job = getJob(args[0], args[1]); + + JobClient.runJob(job); + } + + /** + * Gets fully configured JobConf instance. + * + * @param input input file name. + * @param output output directory name. + * @return Job configuration + */ + public static JobConf getJob(String input, String output) { + JobConf conf = new JobConf(HadoopWordCount1.class); + conf.setJobName("wordcount"); + + conf.setOutputKeyClass(Text.class); + conf.setOutputValueClass(IntWritable.class); + + setTasksClasses(conf, true, true, true); + + FileInputFormat.setInputPaths(conf, new Path(input)); + FileOutputFormat.setOutputPath(conf, new Path(output)); + + return conf; + } + + /** + * Sets task classes with related info if needed into configuration object. + * + * @param jobConf Configuration to change. + * @param setMapper Option to set mapper and input format classes. + * @param setCombiner Option to set combiner class. + * @param setReducer Option to set reducer and output format classes. + */ + public static void setTasksClasses(JobConf jobConf, boolean setMapper, boolean setCombiner, boolean setReducer) { + if (setMapper) { + jobConf.setMapperClass(HadoopWordCount1Map.class); + jobConf.setInputFormat(TextInputFormat.class); + } + + if (setCombiner) + jobConf.setCombinerClass(HadoopWordCount1Reduce.class); + + if (setReducer) { + jobConf.setReducerClass(HadoopWordCount1Reduce.class); + jobConf.setOutputFormat(TextOutputFormat.class); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount1Map.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount1Map.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount1Map.java new file mode 100644 index 0000000..c10a7fb --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount1Map.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.examples; + +import org.apache.hadoop.io.*; +import org.apache.hadoop.mapred.*; + +import java.io.*; +import java.util.*; + +/** + * Mapper phase of WordCount job. + */ +public class HadoopWordCount1Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> { + /** Writable integer constant of '1' is writing as count of found words. */ + private static final IntWritable one = new IntWritable(1); + + /** Writable container for writing word. */ + private Text word = new Text(); + + /** Flag is to check that mapper was configured before run. */ + private boolean wasConfigured; + + /** {@inheritDoc} */ + @Override public void map(LongWritable key, Text val, OutputCollector<Text, IntWritable> output, Reporter reporter) + throws IOException { + + assert wasConfigured : "Mapper should be configured"; + + String line = val.toString(); + + StringTokenizer tokenizer = new StringTokenizer(line); + + while (tokenizer.hasMoreTokens()) { + word.set(tokenizer.nextToken()); + + output.collect(word, one); + } + } + + /** {@inheritDoc} */ + @Override public void configure(JobConf job) { + super.configure(job); + + wasConfigured = true; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount1Reduce.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount1Reduce.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount1Reduce.java new file mode 100644 index 0000000..76cd1c3 --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount1Reduce.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.examples; + +import org.apache.hadoop.io.*; +import org.apache.hadoop.mapred.*; + +import java.io.*; +import java.util.*; + +/** + * Combiner and Reducer phase of WordCount job. + */ +public class HadoopWordCount1Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { + /** Flag is to check that mapper was configured before run. */ + private boolean wasConfigured; + + /** {@inheritDoc} */ + @Override public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) + throws IOException { + assert wasConfigured : "Reducer should be configured"; + + int sum = 0; + + while (values.hasNext()) + sum += values.next().get(); + + output.collect(key, new IntWritable(sum)); + } + + @Override public void configure(JobConf job) { + super.configure(job); + + wasConfigured = true; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2.java new file mode 100644 index 0000000..dc68df7 --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.examples; + +import org.apache.hadoop.fs.*; +import org.apache.hadoop.io.*; +import org.apache.hadoop.mapreduce.*; +import org.apache.hadoop.mapreduce.lib.input.*; +import org.apache.hadoop.mapreduce.lib.output.*; + +import java.io.*; + +/** + * Example job for testing hadoop task execution. + */ +public class HadoopWordCount2 { + /** + * Entry point to start job. + * + * @param args Command line parameters. + * @throws Exception If fails. + */ + public static void main(String[] args) throws Exception { + if (args.length != 2) { + System.out.println("usage: [input] [output]"); + System.exit(-1); + } + + Job job = getJob(args[0], args[1]); + + job.submit(); + } + + /** + * Gets fully configured Job instance. + * + * @param input Input file name. + * @param output Output directory name. + * @return Job instance. + * @throws IOException If fails. + */ + public static Job getJob(String input, String output) throws IOException { + Job job = Job.getInstance(); + + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(IntWritable.class); + + setTasksClasses(job, true, true, true); + + FileInputFormat.setInputPaths(job, new Path(input)); + FileOutputFormat.setOutputPath(job, new Path(output)); + + job.setJarByClass(HadoopWordCount2.class); + + return job; + } + + /** + * Sets task classes with related info if needed into configuration object. + * + * @param job Configuration to change. + * @param setMapper Option to set mapper and input format classes. + * @param setCombiner Option to set combiner class. + * @param setReducer Option to set reducer and output format classes. + */ + public static void setTasksClasses(Job job, boolean setMapper, boolean setCombiner, boolean setReducer) { + if (setMapper) { + job.setMapperClass(HadoopWordCount2Mapper.class); + job.setInputFormatClass(TextInputFormat.class); + } + + if (setCombiner) + job.setCombinerClass(HadoopWordCount2Reducer.class); + + if (setReducer) { + job.setReducerClass(HadoopWordCount2Reducer.class); + job.setOutputFormatClass(TextOutputFormat.class); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2Mapper.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2Mapper.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2Mapper.java new file mode 100644 index 0000000..6ca7ccd --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2Mapper.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.examples; + +import org.apache.hadoop.conf.*; +import org.apache.hadoop.io.*; +import org.apache.hadoop.mapreduce.*; + +import java.io.*; +import java.util.*; + +/** + * Mapper phase of WordCount job. + */ +public class HadoopWordCount2Mapper extends Mapper<Object, Text, Text, IntWritable> implements Configurable { + /** Writable container for writing word. */ + private Text word = new Text(); + + /** Writable integer constant of '1' is writing as count of found words. */ + private static final IntWritable one = new IntWritable(1); + + /** Flag is to check that mapper was configured before run. */ + private boolean wasConfigured; + + /** Flag is to check that mapper was set up before run. */ + private boolean wasSetUp; + + /** {@inheritDoc} */ + @Override public void map(Object key, Text val, Context ctx) throws IOException, InterruptedException { + assert wasConfigured : "Mapper should be configured"; + assert wasSetUp : "Mapper should be set up"; + + StringTokenizer wordList = new StringTokenizer(val.toString()); + + while (wordList.hasMoreTokens()) { + word.set(wordList.nextToken()); + + ctx.write(word, one); + } + } + + /** {@inheritDoc} */ + @Override protected void setup(Context context) throws IOException, InterruptedException { + super.setup(context); + wasSetUp = true; + } + + /** {@inheritDoc} */ + @Override public void setConf(Configuration conf) { + wasConfigured = true; + } + + /** {@inheritDoc} */ + @Override public Configuration getConf() { + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2Reducer.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2Reducer.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2Reducer.java new file mode 100644 index 0000000..fedaaf9 --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2Reducer.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.examples; + +import org.apache.hadoop.conf.*; +import org.apache.hadoop.io.*; +import org.apache.hadoop.mapreduce.*; + +import java.io.*; + +/** + * Combiner and Reducer phase of WordCount job. + */ +public class HadoopWordCount2Reducer extends Reducer<Text, IntWritable, Text, IntWritable> implements Configurable { + /** Writable container for writing sum of word counts. */ + private IntWritable totalWordCnt = new IntWritable(); + + /** Flag is to check that mapper was configured before run. */ + private boolean wasConfigured; + + /** Flag is to check that mapper was set up before run. */ + private boolean wasSetUp; + + /** {@inheritDoc} */ + @Override public void reduce(Text key, Iterable<IntWritable> values, Context ctx) throws IOException, InterruptedException { + assert wasConfigured : "Reducer should be configured"; + assert wasSetUp : "Reducer should be set up"; + + int wordCnt = 0; + + for (IntWritable value : values) + wordCnt += value.get(); + + totalWordCnt.set(wordCnt); + + ctx.write(key, totalWordCnt); + } + + /** {@inheritDoc} */ + @Override protected void setup(Context context) throws IOException, InterruptedException { + super.setup(context); + wasSetUp = true; + } + + /** {@inheritDoc} */ + @Override public void setConf(Configuration conf) { + wasConfigured = true; + } + + /** {@inheritDoc} */ + @Override public Configuration getConf() { + return null; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopAbstractMapTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopAbstractMapTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopAbstractMapTest.java deleted file mode 100644 index 716fe19..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopAbstractMapTest.java +++ /dev/null @@ -1,152 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.shuffle.collections; - -import org.apache.commons.collections.comparators.*; -import org.apache.hadoop.io.*; -import org.apache.ignite.*; -import org.apache.ignite.internal.processors.hadoop.*; -import org.apache.ignite.internal.processors.hadoop.v2.*; -import org.apache.ignite.testframework.junits.common.*; -import org.jetbrains.annotations.*; - -import java.util.*; - -/** - * Abstract class for maps test. - */ -public abstract class GridHadoopAbstractMapTest extends GridCommonAbstractTest { - /** - * Test task context. - */ - protected static class TaskContext extends GridHadoopTaskContext { - /** - */ - protected TaskContext() { - super(null, null); - } - - /** {@inheritDoc} */ - @Override public <T extends GridHadoopCounter> T counter(String grp, String name, Class<T> cls) { - return null; - } - - /** {@inheritDoc} */ - @Override public GridHadoopCounters counters() { - return null; - } - - /** {@inheritDoc} */ - @Override public GridHadoopPartitioner partitioner() throws IgniteCheckedException { - assert false; - - return null; - } - - /** {@inheritDoc} */ - @Override public GridHadoopSerialization keySerialization() throws IgniteCheckedException { - return new GridHadoopWritableSerialization(IntWritable.class); - } - - /** {@inheritDoc} */ - @Override public GridHadoopSerialization valueSerialization() throws IgniteCheckedException { - return new GridHadoopWritableSerialization(IntWritable.class); - } - - /** {@inheritDoc} */ - @Override public Comparator<Object> sortComparator() { - return ComparableComparator.getInstance(); - } - - /** {@inheritDoc} */ - @Override public Comparator<Object> groupComparator() { - return ComparableComparator.getInstance(); - } - - /** {@inheritDoc} */ - @Override public void run() throws IgniteCheckedException { - assert false; - } - - /** {@inheritDoc} */ - @Override public void cancel() { - assert false; - } - - /** {@inheritDoc} */ - @Override public void prepareTaskEnvironment() throws IgniteCheckedException { - assert false; - } - - @Override public void cleanupTaskEnvironment() throws IgniteCheckedException { - assert false; - } - } - - /** - * Test job info. - */ - protected static class JobInfo implements GridHadoopJobInfo { - /** {@inheritDoc} */ - @Nullable @Override public String property(String name) { - return null; - } - - /** {@inheritDoc} */ - @Override public boolean hasCombiner() { - assert false; - - return false; - } - - /** {@inheritDoc} */ - @Override public boolean hasReducer() { - assert false; - - return false; - } - - /** {@inheritDoc} */ - @Override public GridHadoopJob createJob(GridHadoopJobId jobId, IgniteLogger log) throws IgniteCheckedException { - assert false; - - return null; - } - - /** {@inheritDoc} */ - @Override public int reducers() { - assert false; - - return 0; - } - - /** {@inheritDoc} */ - @Override public String jobName() { - assert false; - - return null; - } - - /** {@inheritDoc} */ - @Override public String user() { - assert false; - - return null; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopConcurrentHashMultimapSelftest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopConcurrentHashMultimapSelftest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopConcurrentHashMultimapSelftest.java deleted file mode 100644 index 88dfd2b..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopConcurrentHashMultimapSelftest.java +++ /dev/null @@ -1,267 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.shuffle.collections; - -import com.google.common.collect.*; -import org.apache.hadoop.io.*; -import org.apache.ignite.internal.processors.hadoop.*; -import org.apache.ignite.internal.util.*; -import org.apache.ignite.internal.util.io.*; -import org.apache.ignite.internal.util.offheap.unsafe.*; -import org.apache.ignite.internal.util.typedef.*; - -import java.io.*; -import java.util.*; -import java.util.concurrent.*; - -import static org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory.*; - -/** - * - */ -public class GridHadoopConcurrentHashMultimapSelftest extends GridHadoopAbstractMapTest { - /** */ - public void testMapSimple() throws Exception { - GridUnsafeMemory mem = new GridUnsafeMemory(0); - -// mem.listen(new GridOffHeapEventListener() { -// @Override public void onEvent(GridOffHeapEvent evt) { -// if (evt == GridOffHeapEvent.ALLOCATE) -// U.dumpStack(); -// } -// }); - - Random rnd = new Random(); - - int mapSize = 16 << rnd.nextInt(3); - - GridHadoopJobInfo job = new JobInfo(); - - GridHadoopTaskContext taskCtx = new TaskContext(); - - GridHadoopConcurrentHashMultimap m = new GridHadoopConcurrentHashMultimap(job, mem, mapSize); - - GridHadoopConcurrentHashMultimap.Adder a = m.startAdding(taskCtx); - - Multimap<Integer, Integer> mm = ArrayListMultimap.create(); - Multimap<Integer, Integer> vis = ArrayListMultimap.create(); - - for (int i = 0, vals = 4 * mapSize + rnd.nextInt(25); i < vals; i++) { - int key = rnd.nextInt(mapSize); - int val = rnd.nextInt(); - - a.write(new IntWritable(key), new IntWritable(val)); - mm.put(key, val); - - X.println("k: " + key + " v: " + val); - - a.close(); - - check(m, mm, vis, taskCtx); - - a = m.startAdding(taskCtx); - } - -// a.add(new IntWritable(10), new IntWritable(2)); -// mm.put(10, 2); -// check(m, mm); - - a.close(); - - X.println("Alloc: " + mem.allocatedSize()); - - m.close(); - - assertEquals(0, mem.allocatedSize()); - } - - private void check(GridHadoopConcurrentHashMultimap m, Multimap<Integer, Integer> mm, - final Multimap<Integer, Integer> vis, GridHadoopTaskContext taskCtx) throws Exception { - final GridHadoopTaskInput in = m.input(taskCtx); - - Map<Integer, Collection<Integer>> mmm = mm.asMap(); - - int keys = 0; - - while (in.next()) { - keys++; - - IntWritable k = (IntWritable)in.key(); - - assertNotNull(k); - - Deque<Integer> vs = new LinkedList<>(); - - Iterator<?> it = in.values(); - - while (it.hasNext()) - vs.addFirst(((IntWritable) it.next()).get()); - - Collection<Integer> exp = mmm.get(k.get()); - - assertEquals(exp, vs); - } - - assertEquals(mmm.size(), keys); - - assertEquals(m.keys(), keys); - - X.println("keys: " + keys + " cap: " + m.capacity()); - - // Check visitor. - - final byte[] buf = new byte[4]; - - final GridDataInput dataInput = new GridUnsafeDataInput(); - - m.visit(false, new GridHadoopConcurrentHashMultimap.Visitor() { - /** */ - IntWritable key = new IntWritable(); - - /** */ - IntWritable val = new IntWritable(); - - @Override public void onKey(long keyPtr, int keySize) { - read(keyPtr, keySize, key); - } - - @Override public void onValue(long valPtr, int valSize) { - read(valPtr, valSize, val); - - vis.put(key.get(), val.get()); - } - - private void read(long ptr, int size, Writable w) { - assert size == 4 : size; - - UNSAFE.copyMemory(null, ptr, buf, BYTE_ARR_OFF, size); - - dataInput.bytes(buf, size); - - try { - w.readFields(dataInput); - } - catch (IOException e) { - throw new RuntimeException(e); - } - } - }); - -// X.println("vis: " + vis); - - assertEquals(mm, vis); - - in.close(); - } - - /** - * @throws Exception if failed. - */ - public void testMultiThreaded() throws Exception { - GridUnsafeMemory mem = new GridUnsafeMemory(0); - - X.println("___ Started"); - - Random rnd = new GridRandom(); - - for (int i = 0; i < 20; i++) { - GridHadoopJobInfo job = new JobInfo(); - - final GridHadoopTaskContext taskCtx = new TaskContext(); - - final GridHadoopConcurrentHashMultimap m = new GridHadoopConcurrentHashMultimap(job, mem, 16); - - final ConcurrentMap<Integer, Collection<Integer>> mm = new ConcurrentHashMap<>(); - - X.println("___ MT"); - - multithreaded(new Callable<Object>() { - @Override public Object call() throws Exception { - X.println("___ TH in"); - - Random rnd = new GridRandom(); - - IntWritable key = new IntWritable(); - IntWritable val = new IntWritable(); - - GridHadoopMultimap.Adder a = m.startAdding(taskCtx); - - for (int i = 0; i < 50000; i++) { - int k = rnd.nextInt(32000); - int v = rnd.nextInt(); - - key.set(k); - val.set(v); - - a.write(key, val); - - Collection<Integer> list = mm.get(k); - - if (list == null) { - list = new ConcurrentLinkedQueue<>(); - - Collection<Integer> old = mm.putIfAbsent(k, list); - - if (old != null) - list = old; - } - - list.add(v); - } - - a.close(); - - X.println("___ TH out"); - - return null; - } - }, 3 + rnd.nextInt(27)); - - X.println("___ Check: " + m.capacity()); - - assertEquals(mm.size(), m.keys()); - - assertTrue(m.capacity() > 32000); - - GridHadoopTaskInput in = m.input(taskCtx); - - while (in.next()) { - IntWritable key = (IntWritable) in.key(); - - Iterator<?> valsIter = in.values(); - - Collection<Integer> vals = mm.remove(key.get()); - - assertNotNull(vals); - - while (valsIter.hasNext()) { - IntWritable val = (IntWritable) valsIter.next(); - - assertTrue(vals.remove(val.get())); - } - - assertTrue(vals.isEmpty()); - } - - in.close(); - m.close(); - - assertEquals(0, mem.allocatedSize()); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopHashMapSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopHashMapSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopHashMapSelfTest.java deleted file mode 100644 index 92177ad..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopHashMapSelfTest.java +++ /dev/null @@ -1,170 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.shuffle.collections; - -import com.google.common.collect.*; -import org.apache.hadoop.io.*; -import org.apache.ignite.internal.processors.hadoop.*; -import org.apache.ignite.internal.util.*; -import org.apache.ignite.internal.util.offheap.unsafe.*; -import org.apache.ignite.internal.util.typedef.*; - -import java.util.*; -import java.util.concurrent.*; - -/** - * - */ -public class GridHadoopHashMapSelfTest extends GridHadoopAbstractMapTest { - - public void _testAllocation() throws Exception { - final GridUnsafeMemory mem = new GridUnsafeMemory(0); - - long size = 3L * 1024 * 1024 * 1024; - - final long chunk = 16;// * 1024; - - final int page = 4 * 1024; - - final int writes = chunk < page ? 1 : (int)(chunk / page); - - final long cnt = size / chunk; - - assert cnt < Integer.MAX_VALUE; - - final int threads = 4; - - long start = System.currentTimeMillis(); - - multithreaded(new Callable<Object>() { - @Override public Object call() throws Exception { - int cnt0 = (int)(cnt / threads); - - for (int i = 0; i < cnt0; i++) { - long ptr = mem.allocate(chunk); - - for (int j = 0; j < writes; j++) - mem.writeInt(ptr + j * page, 100500); - } - - return null; - } - }, threads); - - X.println("End: " + (System.currentTimeMillis() - start) + " mem: " + mem.allocatedSize() + " cnt: " + cnt); - - Thread.sleep(30000); - } - - - /** */ - public void testMapSimple() throws Exception { - GridUnsafeMemory mem = new GridUnsafeMemory(0); - -// mem.listen(new GridOffHeapEventListener() { -// @Override public void onEvent(GridOffHeapEvent evt) { -// if (evt == GridOffHeapEvent.ALLOCATE) -// U.dumpStack(); -// } -// }); - - Random rnd = new Random(); - - int mapSize = 16 << rnd.nextInt(3); - - GridHadoopTaskContext taskCtx = new TaskContext(); - - final GridHadoopHashMultimap m = new GridHadoopHashMultimap(new JobInfo(), mem, mapSize); - - GridHadoopMultimap.Adder a = m.startAdding(taskCtx); - - Multimap<Integer, Integer> mm = ArrayListMultimap.create(); - - for (int i = 0, vals = 4 * mapSize + rnd.nextInt(25); i < vals; i++) { - int key = rnd.nextInt(mapSize); - int val = rnd.nextInt(); - - a.write(new IntWritable(key), new IntWritable(val)); - mm.put(key, val); - - X.println("k: " + key + " v: " + val); - - a.close(); - - check(m, mm, taskCtx); - - a = m.startAdding(taskCtx); - } - -// a.add(new IntWritable(10), new IntWritable(2)); -// mm.put(10, 2); -// check(m, mm); - - a.close(); - - X.println("Alloc: " + mem.allocatedSize()); - - m.close(); - - assertEquals(0, mem.allocatedSize()); - } - - private void check(GridHadoopHashMultimap m, Multimap<Integer, Integer> mm, GridHadoopTaskContext taskCtx) throws Exception { - final GridHadoopTaskInput in = m.input(taskCtx); - - Map<Integer, Collection<Integer>> mmm = mm.asMap(); - - int keys = 0; - - while (in.next()) { - keys++; - - IntWritable k = (IntWritable)in.key(); - - assertNotNull(k); - - ArrayList<Integer> vs = new ArrayList<>(); - - Iterator<?> it = in.values(); - - while (it.hasNext()) - vs.add(((IntWritable) it.next()).get()); - - Collection<Integer> exp = mmm.get(k.get()); - - assertEquals(sorted(exp), sorted(vs)); - } - - X.println("keys: " + keys + " cap: " + m.capacity()); - - assertEquals(mmm.size(), keys); - - assertEquals(m.keys(), keys); - - in.close(); - } - - private GridLongList sorted(Collection<Integer> col) { - GridLongList lst = new GridLongList(col.size()); - - for (Integer i : col) - lst.add(i); - - return lst.sort(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopSkipListSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopSkipListSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopSkipListSelfTest.java deleted file mode 100644 index 6ba00ad..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopSkipListSelfTest.java +++ /dev/null @@ -1,303 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.shuffle.collections; - -import com.google.common.collect.*; -import org.apache.hadoop.io.*; -import org.apache.ignite.internal.processors.hadoop.*; -import org.apache.ignite.internal.util.*; -import org.apache.ignite.internal.util.io.*; -import org.apache.ignite.internal.util.offheap.unsafe.*; -import org.apache.ignite.internal.util.typedef.*; - -import java.io.*; -import java.util.*; -import java.util.concurrent.*; - -import static java.lang.Math.*; -import static org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory.*; - -/** - * Skip list tests. - */ -public class GridHadoopSkipListSelfTest extends GridHadoopAbstractMapTest { - /** - * - */ - public void testLevel() { - Random rnd = new GridRandom(); - - int[] levelsCnts = new int[32]; - - int all = 10000; - - for (int i = 0; i < all; i++) { - int level = GridHadoopSkipList.randomLevel(rnd); - - levelsCnts[level]++; - } - - X.println("Distribution: " + Arrays.toString(levelsCnts)); - - for (int level = 0; level < levelsCnts.length; level++) { - int exp = (level + 1) == levelsCnts.length ? 0 : all >>> (level + 1); - - double precission = 0.72 / Math.max(32 >>> level, 1); - - int sigma = max((int)ceil(precission * exp), 5); - - X.println("Level: " + level + " exp: " + exp + " act: " + levelsCnts[level] + " precission: " + precission + - " sigma: " + sigma); - - assertTrue(abs(exp - levelsCnts[level]) <= sigma); - } - } - - public void testMapSimple() throws Exception { - GridUnsafeMemory mem = new GridUnsafeMemory(0); - -// mem.listen(new GridOffHeapEventListener() { -// @Override public void onEvent(GridOffHeapEvent evt) { -// if (evt == GridOffHeapEvent.ALLOCATE) -// U.dumpStack(); -// } -// }); - - Random rnd = new Random(); - - int mapSize = 16 << rnd.nextInt(6); - - GridHadoopJobInfo job = new JobInfo(); - - GridHadoopTaskContext taskCtx = new TaskContext(); - - GridHadoopMultimap m = new GridHadoopSkipList(job, mem); - - GridHadoopConcurrentHashMultimap.Adder a = m.startAdding(taskCtx); - - Multimap<Integer, Integer> mm = ArrayListMultimap.create(); - Multimap<Integer, Integer> vis = ArrayListMultimap.create(); - - for (int i = 0, vals = 4 * mapSize + rnd.nextInt(25); i < vals; i++) { - int key = rnd.nextInt(mapSize); - int val = rnd.nextInt(); - - a.write(new IntWritable(key), new IntWritable(val)); - mm.put(key, val); - - X.println("k: " + key + " v: " + val); - - a.close(); - - check(m, mm, vis, taskCtx); - - a = m.startAdding(taskCtx); - } - -// a.add(new IntWritable(10), new IntWritable(2)); -// mm.put(10, 2); -// check(m, mm); - - a.close(); - - X.println("Alloc: " + mem.allocatedSize()); - - m.close(); - - assertEquals(0, mem.allocatedSize()); - } - - private void check(GridHadoopMultimap m, Multimap<Integer, Integer> mm, final Multimap<Integer, Integer> vis, GridHadoopTaskContext taskCtx) - throws Exception { - final GridHadoopTaskInput in = m.input(taskCtx); - - Map<Integer, Collection<Integer>> mmm = mm.asMap(); - - int keys = 0; - - int prevKey = Integer.MIN_VALUE; - - while (in.next()) { - keys++; - - IntWritable k = (IntWritable)in.key(); - - assertNotNull(k); - - assertTrue(k.get() > prevKey); - - prevKey = k.get(); - - Deque<Integer> vs = new LinkedList<>(); - - Iterator<?> it = in.values(); - - while (it.hasNext()) - vs.addFirst(((IntWritable) it.next()).get()); - - Collection<Integer> exp = mmm.get(k.get()); - - assertEquals(exp, vs); - } - - assertEquals(mmm.size(), keys); - -//! assertEquals(m.keys(), keys); - - // Check visitor. - - final byte[] buf = new byte[4]; - - final GridDataInput dataInput = new GridUnsafeDataInput(); - - m.visit(false, new GridHadoopConcurrentHashMultimap.Visitor() { - /** */ - IntWritable key = new IntWritable(); - - /** */ - IntWritable val = new IntWritable(); - - @Override public void onKey(long keyPtr, int keySize) { - read(keyPtr, keySize, key); - } - - @Override public void onValue(long valPtr, int valSize) { - read(valPtr, valSize, val); - - vis.put(key.get(), val.get()); - } - - private void read(long ptr, int size, Writable w) { - assert size == 4 : size; - - UNSAFE.copyMemory(null, ptr, buf, BYTE_ARR_OFF, size); - - dataInput.bytes(buf, size); - - try { - w.readFields(dataInput); - } - catch (IOException e) { - throw new RuntimeException(e); - } - } - }); - -// X.println("vis: " + vis); - - assertEquals(mm, vis); - - in.close(); - } - - /** - * @throws Exception if failed. - */ - public void testMultiThreaded() throws Exception { - GridUnsafeMemory mem = new GridUnsafeMemory(0); - - X.println("___ Started"); - - Random rnd = new GridRandom(); - - for (int i = 0; i < 20; i++) { - GridHadoopJobInfo job = new JobInfo(); - - final GridHadoopTaskContext taskCtx = new TaskContext(); - - final GridHadoopMultimap m = new GridHadoopSkipList(job, mem); - - final ConcurrentMap<Integer, Collection<Integer>> mm = new ConcurrentHashMap<>(); - - X.println("___ MT"); - - multithreaded(new Callable<Object>() { - @Override public Object call() throws Exception { - X.println("___ TH in"); - - Random rnd = new GridRandom(); - - IntWritable key = new IntWritable(); - IntWritable val = new IntWritable(); - - GridHadoopMultimap.Adder a = m.startAdding(taskCtx); - - for (int i = 0; i < 50000; i++) { - int k = rnd.nextInt(32000); - int v = rnd.nextInt(); - - key.set(k); - val.set(v); - - a.write(key, val); - - Collection<Integer> list = mm.get(k); - - if (list == null) { - list = new ConcurrentLinkedQueue<>(); - - Collection<Integer> old = mm.putIfAbsent(k, list); - - if (old != null) - list = old; - } - - list.add(v); - } - - a.close(); - - X.println("___ TH out"); - - return null; - } - }, 3 + rnd.nextInt(27)); - - GridHadoopTaskInput in = m.input(taskCtx); - - int prevKey = Integer.MIN_VALUE; - - while (in.next()) { - IntWritable key = (IntWritable)in.key(); - - assertTrue(key.get() > prevKey); - - prevKey = key.get(); - - Iterator<?> valsIter = in.values(); - - Collection<Integer> vals = mm.remove(key.get()); - - assertNotNull(vals); - - while (valsIter.hasNext()) { - IntWritable val = (IntWritable) valsIter.next(); - - assertTrue(vals.remove(val.get())); - } - - assertTrue(vals.isEmpty()); - } - - in.close(); - m.close(); - - assertEquals(0, mem.allocatedSize()); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java new file mode 100644 index 0000000..b4ed5e1 --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.shuffle.collections; + +import org.apache.commons.collections.comparators.*; +import org.apache.hadoop.io.*; +import org.apache.ignite.*; +import org.apache.ignite.internal.processors.hadoop.*; +import org.apache.ignite.internal.processors.hadoop.counter.*; +import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters; +import org.apache.ignite.internal.processors.hadoop.v2.*; +import org.apache.ignite.testframework.junits.common.*; +import org.jetbrains.annotations.*; + +import java.util.*; + +/** + * Abstract class for maps test. + */ +public abstract class HadoopAbstractMapTest extends GridCommonAbstractTest { + /** + * Test task context. + */ + protected static class TaskContext extends HadoopTaskContext { + /** + */ + protected TaskContext() { + super(null, null); + } + + /** {@inheritDoc} */ + @Override public <T extends HadoopCounter> T counter(String grp, String name, Class<T> cls) { + return null; + } + + /** {@inheritDoc} */ + @Override public HadoopCounters counters() { + return null; + } + + /** {@inheritDoc} */ + @Override public HadoopPartitioner partitioner() throws IgniteCheckedException { + assert false; + + return null; + } + + /** {@inheritDoc} */ + @Override public HadoopSerialization keySerialization() throws IgniteCheckedException { + return new HadoopWritableSerialization(IntWritable.class); + } + + /** {@inheritDoc} */ + @Override public HadoopSerialization valueSerialization() throws IgniteCheckedException { + return new HadoopWritableSerialization(IntWritable.class); + } + + /** {@inheritDoc} */ + @Override public Comparator<Object> sortComparator() { + return ComparableComparator.getInstance(); + } + + /** {@inheritDoc} */ + @Override public Comparator<Object> groupComparator() { + return ComparableComparator.getInstance(); + } + + /** {@inheritDoc} */ + @Override public void run() throws IgniteCheckedException { + assert false; + } + + /** {@inheritDoc} */ + @Override public void cancel() { + assert false; + } + + /** {@inheritDoc} */ + @Override public void prepareTaskEnvironment() throws IgniteCheckedException { + assert false; + } + + @Override public void cleanupTaskEnvironment() throws IgniteCheckedException { + assert false; + } + } + + /** + * Test job info. + */ + protected static class JobInfo implements HadoopJobInfo { + /** {@inheritDoc} */ + @Nullable @Override public String property(String name) { + return null; + } + + /** {@inheritDoc} */ + @Override public boolean hasCombiner() { + assert false; + + return false; + } + + /** {@inheritDoc} */ + @Override public boolean hasReducer() { + assert false; + + return false; + } + + /** {@inheritDoc} */ + @Override public HadoopJob createJob(HadoopJobId jobId, IgniteLogger log) throws IgniteCheckedException { + assert false; + + return null; + } + + /** {@inheritDoc} */ + @Override public int reducers() { + assert false; + + return 0; + } + + /** {@inheritDoc} */ + @Override public String jobName() { + assert false; + + return null; + } + + /** {@inheritDoc} */ + @Override public String user() { + assert false; + + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopConcurrentHashMultimapSelftest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopConcurrentHashMultimapSelftest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopConcurrentHashMultimapSelftest.java new file mode 100644 index 0000000..ae6bafa --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopConcurrentHashMultimapSelftest.java @@ -0,0 +1,267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.shuffle.collections; + +import com.google.common.collect.*; +import org.apache.hadoop.io.*; +import org.apache.ignite.internal.processors.hadoop.*; +import org.apache.ignite.internal.util.*; +import org.apache.ignite.internal.util.io.*; +import org.apache.ignite.internal.util.offheap.unsafe.*; +import org.apache.ignite.internal.util.typedef.*; + +import java.io.*; +import java.util.*; +import java.util.concurrent.*; + +import static org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory.*; + +/** + * + */ +public class HadoopConcurrentHashMultimapSelftest extends HadoopAbstractMapTest { + /** */ + public void testMapSimple() throws Exception { + GridUnsafeMemory mem = new GridUnsafeMemory(0); + +// mem.listen(new GridOffHeapEventListener() { +// @Override public void onEvent(GridOffHeapEvent evt) { +// if (evt == GridOffHeapEvent.ALLOCATE) +// U.dumpStack(); +// } +// }); + + Random rnd = new Random(); + + int mapSize = 16 << rnd.nextInt(3); + + HadoopJobInfo job = new JobInfo(); + + HadoopTaskContext taskCtx = new TaskContext(); + + HadoopConcurrentHashMultimap m = new HadoopConcurrentHashMultimap(job, mem, mapSize); + + HadoopConcurrentHashMultimap.Adder a = m.startAdding(taskCtx); + + Multimap<Integer, Integer> mm = ArrayListMultimap.create(); + Multimap<Integer, Integer> vis = ArrayListMultimap.create(); + + for (int i = 0, vals = 4 * mapSize + rnd.nextInt(25); i < vals; i++) { + int key = rnd.nextInt(mapSize); + int val = rnd.nextInt(); + + a.write(new IntWritable(key), new IntWritable(val)); + mm.put(key, val); + + X.println("k: " + key + " v: " + val); + + a.close(); + + check(m, mm, vis, taskCtx); + + a = m.startAdding(taskCtx); + } + +// a.add(new IntWritable(10), new IntWritable(2)); +// mm.put(10, 2); +// check(m, mm); + + a.close(); + + X.println("Alloc: " + mem.allocatedSize()); + + m.close(); + + assertEquals(0, mem.allocatedSize()); + } + + private void check(HadoopConcurrentHashMultimap m, Multimap<Integer, Integer> mm, + final Multimap<Integer, Integer> vis, HadoopTaskContext taskCtx) throws Exception { + final HadoopTaskInput in = m.input(taskCtx); + + Map<Integer, Collection<Integer>> mmm = mm.asMap(); + + int keys = 0; + + while (in.next()) { + keys++; + + IntWritable k = (IntWritable)in.key(); + + assertNotNull(k); + + Deque<Integer> vs = new LinkedList<>(); + + Iterator<?> it = in.values(); + + while (it.hasNext()) + vs.addFirst(((IntWritable) it.next()).get()); + + Collection<Integer> exp = mmm.get(k.get()); + + assertEquals(exp, vs); + } + + assertEquals(mmm.size(), keys); + + assertEquals(m.keys(), keys); + + X.println("keys: " + keys + " cap: " + m.capacity()); + + // Check visitor. + + final byte[] buf = new byte[4]; + + final GridDataInput dataInput = new GridUnsafeDataInput(); + + m.visit(false, new HadoopConcurrentHashMultimap.Visitor() { + /** */ + IntWritable key = new IntWritable(); + + /** */ + IntWritable val = new IntWritable(); + + @Override public void onKey(long keyPtr, int keySize) { + read(keyPtr, keySize, key); + } + + @Override public void onValue(long valPtr, int valSize) { + read(valPtr, valSize, val); + + vis.put(key.get(), val.get()); + } + + private void read(long ptr, int size, Writable w) { + assert size == 4 : size; + + UNSAFE.copyMemory(null, ptr, buf, BYTE_ARR_OFF, size); + + dataInput.bytes(buf, size); + + try { + w.readFields(dataInput); + } + catch (IOException e) { + throw new RuntimeException(e); + } + } + }); + +// X.println("vis: " + vis); + + assertEquals(mm, vis); + + in.close(); + } + + /** + * @throws Exception if failed. + */ + public void testMultiThreaded() throws Exception { + GridUnsafeMemory mem = new GridUnsafeMemory(0); + + X.println("___ Started"); + + Random rnd = new GridRandom(); + + for (int i = 0; i < 20; i++) { + HadoopJobInfo job = new JobInfo(); + + final HadoopTaskContext taskCtx = new TaskContext(); + + final HadoopConcurrentHashMultimap m = new HadoopConcurrentHashMultimap(job, mem, 16); + + final ConcurrentMap<Integer, Collection<Integer>> mm = new ConcurrentHashMap<>(); + + X.println("___ MT"); + + multithreaded(new Callable<Object>() { + @Override public Object call() throws Exception { + X.println("___ TH in"); + + Random rnd = new GridRandom(); + + IntWritable key = new IntWritable(); + IntWritable val = new IntWritable(); + + HadoopMultimap.Adder a = m.startAdding(taskCtx); + + for (int i = 0; i < 50000; i++) { + int k = rnd.nextInt(32000); + int v = rnd.nextInt(); + + key.set(k); + val.set(v); + + a.write(key, val); + + Collection<Integer> list = mm.get(k); + + if (list == null) { + list = new ConcurrentLinkedQueue<>(); + + Collection<Integer> old = mm.putIfAbsent(k, list); + + if (old != null) + list = old; + } + + list.add(v); + } + + a.close(); + + X.println("___ TH out"); + + return null; + } + }, 3 + rnd.nextInt(27)); + + X.println("___ Check: " + m.capacity()); + + assertEquals(mm.size(), m.keys()); + + assertTrue(m.capacity() > 32000); + + HadoopTaskInput in = m.input(taskCtx); + + while (in.next()) { + IntWritable key = (IntWritable) in.key(); + + Iterator<?> valsIter = in.values(); + + Collection<Integer> vals = mm.remove(key.get()); + + assertNotNull(vals); + + while (valsIter.hasNext()) { + IntWritable val = (IntWritable) valsIter.next(); + + assertTrue(vals.remove(val.get())); + } + + assertTrue(vals.isEmpty()); + } + + in.close(); + m.close(); + + assertEquals(0, mem.allocatedSize()); + } + } +}