http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopConcurrentHashMultimapSelftest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopConcurrentHashMultimapSelftest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopConcurrentHashMultimapSelftest.java deleted file mode 100644 index 43b1f02..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopConcurrentHashMultimapSelftest.java +++ /dev/null @@ -1,267 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.shuffle.collections; - -import com.google.common.collect.*; -import org.apache.hadoop.io.*; -import org.apache.ignite.internal.processors.hadoop.*; -import org.apache.ignite.internal.util.*; -import org.apache.ignite.internal.util.io.*; -import org.apache.ignite.internal.util.offheap.unsafe.*; -import org.apache.ignite.internal.util.typedef.*; - -import java.io.*; -import java.util.*; -import java.util.concurrent.*; - -import static org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory.*; - -/** - * - */ -public class GridHadoopConcurrentHashMultimapSelftest extends HadoopAbstractMapTest { - /** */ - public void testMapSimple() throws Exception { - GridUnsafeMemory mem = new GridUnsafeMemory(0); - -// mem.listen(new GridOffHeapEventListener() { -// @Override public void onEvent(GridOffHeapEvent evt) { -// if (evt == GridOffHeapEvent.ALLOCATE) -// U.dumpStack(); -// } -// }); - - Random rnd = new Random(); - - int mapSize = 16 << rnd.nextInt(3); - - GridHadoopJobInfo job = new JobInfo(); - - HadoopTaskContext taskCtx = new TaskContext(); - - HadoopConcurrentHashMultimap m = new HadoopConcurrentHashMultimap(job, mem, mapSize); - - HadoopConcurrentHashMultimap.Adder a = m.startAdding(taskCtx); - - Multimap<Integer, Integer> mm = ArrayListMultimap.create(); - Multimap<Integer, Integer> vis = ArrayListMultimap.create(); - - for (int i = 0, vals = 4 * mapSize + rnd.nextInt(25); i < vals; i++) { - int key = rnd.nextInt(mapSize); - int val = rnd.nextInt(); - - a.write(new IntWritable(key), new IntWritable(val)); - mm.put(key, val); - - X.println("k: " + key + " v: " + val); - - a.close(); - - check(m, mm, vis, taskCtx); - - a = m.startAdding(taskCtx); - } - -// a.add(new IntWritable(10), new IntWritable(2)); -// mm.put(10, 2); -// check(m, mm); - - a.close(); - - X.println("Alloc: " + mem.allocatedSize()); - - m.close(); - - assertEquals(0, mem.allocatedSize()); - } - - private void check(HadoopConcurrentHashMultimap m, Multimap<Integer, Integer> mm, - final Multimap<Integer, Integer> vis, HadoopTaskContext taskCtx) throws Exception { - final HadoopTaskInput in = m.input(taskCtx); - - Map<Integer, Collection<Integer>> mmm = mm.asMap(); - - int keys = 0; - - while (in.next()) { - keys++; - - IntWritable k = (IntWritable)in.key(); - - assertNotNull(k); - - Deque<Integer> vs = new LinkedList<>(); - - Iterator<?> it = in.values(); - - while (it.hasNext()) - vs.addFirst(((IntWritable) it.next()).get()); - - Collection<Integer> exp = mmm.get(k.get()); - - assertEquals(exp, vs); - } - - assertEquals(mmm.size(), keys); - - assertEquals(m.keys(), keys); - - X.println("keys: " + keys + " cap: " + m.capacity()); - - // Check visitor. - - final byte[] buf = new byte[4]; - - final GridDataInput dataInput = new GridUnsafeDataInput(); - - m.visit(false, new HadoopConcurrentHashMultimap.Visitor() { - /** */ - IntWritable key = new IntWritable(); - - /** */ - IntWritable val = new IntWritable(); - - @Override public void onKey(long keyPtr, int keySize) { - read(keyPtr, keySize, key); - } - - @Override public void onValue(long valPtr, int valSize) { - read(valPtr, valSize, val); - - vis.put(key.get(), val.get()); - } - - private void read(long ptr, int size, Writable w) { - assert size == 4 : size; - - UNSAFE.copyMemory(null, ptr, buf, BYTE_ARR_OFF, size); - - dataInput.bytes(buf, size); - - try { - w.readFields(dataInput); - } - catch (IOException e) { - throw new RuntimeException(e); - } - } - }); - -// X.println("vis: " + vis); - - assertEquals(mm, vis); - - in.close(); - } - - /** - * @throws Exception if failed. - */ - public void testMultiThreaded() throws Exception { - GridUnsafeMemory mem = new GridUnsafeMemory(0); - - X.println("___ Started"); - - Random rnd = new GridRandom(); - - for (int i = 0; i < 20; i++) { - GridHadoopJobInfo job = new JobInfo(); - - final HadoopTaskContext taskCtx = new TaskContext(); - - final HadoopConcurrentHashMultimap m = new HadoopConcurrentHashMultimap(job, mem, 16); - - final ConcurrentMap<Integer, Collection<Integer>> mm = new ConcurrentHashMap<>(); - - X.println("___ MT"); - - multithreaded(new Callable<Object>() { - @Override public Object call() throws Exception { - X.println("___ TH in"); - - Random rnd = new GridRandom(); - - IntWritable key = new IntWritable(); - IntWritable val = new IntWritable(); - - HadoopMultimap.Adder a = m.startAdding(taskCtx); - - for (int i = 0; i < 50000; i++) { - int k = rnd.nextInt(32000); - int v = rnd.nextInt(); - - key.set(k); - val.set(v); - - a.write(key, val); - - Collection<Integer> list = mm.get(k); - - if (list == null) { - list = new ConcurrentLinkedQueue<>(); - - Collection<Integer> old = mm.putIfAbsent(k, list); - - if (old != null) - list = old; - } - - list.add(v); - } - - a.close(); - - X.println("___ TH out"); - - return null; - } - }, 3 + rnd.nextInt(27)); - - X.println("___ Check: " + m.capacity()); - - assertEquals(mm.size(), m.keys()); - - assertTrue(m.capacity() > 32000); - - HadoopTaskInput in = m.input(taskCtx); - - while (in.next()) { - IntWritable key = (IntWritable) in.key(); - - Iterator<?> valsIter = in.values(); - - Collection<Integer> vals = mm.remove(key.get()); - - assertNotNull(vals); - - while (valsIter.hasNext()) { - IntWritable val = (IntWritable) valsIter.next(); - - assertTrue(vals.remove(val.get())); - } - - assertTrue(vals.isEmpty()); - } - - in.close(); - m.close(); - - assertEquals(0, mem.allocatedSize()); - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java index e98e82c..b4ed5e1 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java @@ -21,6 +21,8 @@ import org.apache.commons.collections.comparators.*; import org.apache.hadoop.io.*; import org.apache.ignite.*; import org.apache.ignite.internal.processors.hadoop.*; +import org.apache.ignite.internal.processors.hadoop.counter.*; +import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters; import org.apache.ignite.internal.processors.hadoop.v2.*; import org.apache.ignite.testframework.junits.common.*; import org.jetbrains.annotations.*; @@ -47,7 +49,7 @@ public abstract class HadoopAbstractMapTest extends GridCommonAbstractTest { } /** {@inheritDoc} */ - @Override public GridHadoopCounters counters() { + @Override public HadoopCounters counters() { return null; } @@ -101,7 +103,7 @@ public abstract class HadoopAbstractMapTest extends GridCommonAbstractTest { /** * Test job info. */ - protected static class JobInfo implements GridHadoopJobInfo { + protected static class JobInfo implements HadoopJobInfo { /** {@inheritDoc} */ @Nullable @Override public String property(String name) { return null; @@ -122,7 +124,7 @@ public abstract class HadoopAbstractMapTest extends GridCommonAbstractTest { } /** {@inheritDoc} */ - @Override public HadoopJob createJob(GridHadoopJobId jobId, IgniteLogger log) throws IgniteCheckedException { + @Override public HadoopJob createJob(HadoopJobId jobId, IgniteLogger log) throws IgniteCheckedException { assert false; return null; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopConcurrentHashMultimapSelftest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopConcurrentHashMultimapSelftest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopConcurrentHashMultimapSelftest.java new file mode 100644 index 0000000..ae6bafa --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopConcurrentHashMultimapSelftest.java @@ -0,0 +1,267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.shuffle.collections; + +import com.google.common.collect.*; +import org.apache.hadoop.io.*; +import org.apache.ignite.internal.processors.hadoop.*; +import org.apache.ignite.internal.util.*; +import org.apache.ignite.internal.util.io.*; +import org.apache.ignite.internal.util.offheap.unsafe.*; +import org.apache.ignite.internal.util.typedef.*; + +import java.io.*; +import java.util.*; +import java.util.concurrent.*; + +import static org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory.*; + +/** + * + */ +public class HadoopConcurrentHashMultimapSelftest extends HadoopAbstractMapTest { + /** */ + public void testMapSimple() throws Exception { + GridUnsafeMemory mem = new GridUnsafeMemory(0); + +// mem.listen(new GridOffHeapEventListener() { +// @Override public void onEvent(GridOffHeapEvent evt) { +// if (evt == GridOffHeapEvent.ALLOCATE) +// U.dumpStack(); +// } +// }); + + Random rnd = new Random(); + + int mapSize = 16 << rnd.nextInt(3); + + HadoopJobInfo job = new JobInfo(); + + HadoopTaskContext taskCtx = new TaskContext(); + + HadoopConcurrentHashMultimap m = new HadoopConcurrentHashMultimap(job, mem, mapSize); + + HadoopConcurrentHashMultimap.Adder a = m.startAdding(taskCtx); + + Multimap<Integer, Integer> mm = ArrayListMultimap.create(); + Multimap<Integer, Integer> vis = ArrayListMultimap.create(); + + for (int i = 0, vals = 4 * mapSize + rnd.nextInt(25); i < vals; i++) { + int key = rnd.nextInt(mapSize); + int val = rnd.nextInt(); + + a.write(new IntWritable(key), new IntWritable(val)); + mm.put(key, val); + + X.println("k: " + key + " v: " + val); + + a.close(); + + check(m, mm, vis, taskCtx); + + a = m.startAdding(taskCtx); + } + +// a.add(new IntWritable(10), new IntWritable(2)); +// mm.put(10, 2); +// check(m, mm); + + a.close(); + + X.println("Alloc: " + mem.allocatedSize()); + + m.close(); + + assertEquals(0, mem.allocatedSize()); + } + + private void check(HadoopConcurrentHashMultimap m, Multimap<Integer, Integer> mm, + final Multimap<Integer, Integer> vis, HadoopTaskContext taskCtx) throws Exception { + final HadoopTaskInput in = m.input(taskCtx); + + Map<Integer, Collection<Integer>> mmm = mm.asMap(); + + int keys = 0; + + while (in.next()) { + keys++; + + IntWritable k = (IntWritable)in.key(); + + assertNotNull(k); + + Deque<Integer> vs = new LinkedList<>(); + + Iterator<?> it = in.values(); + + while (it.hasNext()) + vs.addFirst(((IntWritable) it.next()).get()); + + Collection<Integer> exp = mmm.get(k.get()); + + assertEquals(exp, vs); + } + + assertEquals(mmm.size(), keys); + + assertEquals(m.keys(), keys); + + X.println("keys: " + keys + " cap: " + m.capacity()); + + // Check visitor. + + final byte[] buf = new byte[4]; + + final GridDataInput dataInput = new GridUnsafeDataInput(); + + m.visit(false, new HadoopConcurrentHashMultimap.Visitor() { + /** */ + IntWritable key = new IntWritable(); + + /** */ + IntWritable val = new IntWritable(); + + @Override public void onKey(long keyPtr, int keySize) { + read(keyPtr, keySize, key); + } + + @Override public void onValue(long valPtr, int valSize) { + read(valPtr, valSize, val); + + vis.put(key.get(), val.get()); + } + + private void read(long ptr, int size, Writable w) { + assert size == 4 : size; + + UNSAFE.copyMemory(null, ptr, buf, BYTE_ARR_OFF, size); + + dataInput.bytes(buf, size); + + try { + w.readFields(dataInput); + } + catch (IOException e) { + throw new RuntimeException(e); + } + } + }); + +// X.println("vis: " + vis); + + assertEquals(mm, vis); + + in.close(); + } + + /** + * @throws Exception if failed. + */ + public void testMultiThreaded() throws Exception { + GridUnsafeMemory mem = new GridUnsafeMemory(0); + + X.println("___ Started"); + + Random rnd = new GridRandom(); + + for (int i = 0; i < 20; i++) { + HadoopJobInfo job = new JobInfo(); + + final HadoopTaskContext taskCtx = new TaskContext(); + + final HadoopConcurrentHashMultimap m = new HadoopConcurrentHashMultimap(job, mem, 16); + + final ConcurrentMap<Integer, Collection<Integer>> mm = new ConcurrentHashMap<>(); + + X.println("___ MT"); + + multithreaded(new Callable<Object>() { + @Override public Object call() throws Exception { + X.println("___ TH in"); + + Random rnd = new GridRandom(); + + IntWritable key = new IntWritable(); + IntWritable val = new IntWritable(); + + HadoopMultimap.Adder a = m.startAdding(taskCtx); + + for (int i = 0; i < 50000; i++) { + int k = rnd.nextInt(32000); + int v = rnd.nextInt(); + + key.set(k); + val.set(v); + + a.write(key, val); + + Collection<Integer> list = mm.get(k); + + if (list == null) { + list = new ConcurrentLinkedQueue<>(); + + Collection<Integer> old = mm.putIfAbsent(k, list); + + if (old != null) + list = old; + } + + list.add(v); + } + + a.close(); + + X.println("___ TH out"); + + return null; + } + }, 3 + rnd.nextInt(27)); + + X.println("___ Check: " + m.capacity()); + + assertEquals(mm.size(), m.keys()); + + assertTrue(m.capacity() > 32000); + + HadoopTaskInput in = m.input(taskCtx); + + while (in.next()) { + IntWritable key = (IntWritable) in.key(); + + Iterator<?> valsIter = in.values(); + + Collection<Integer> vals = mm.remove(key.get()); + + assertNotNull(vals); + + while (valsIter.hasNext()) { + IntWritable val = (IntWritable) valsIter.next(); + + assertTrue(vals.remove(val.get())); + } + + assertTrue(vals.isEmpty()); + } + + in.close(); + m.close(); + + assertEquals(0, mem.allocatedSize()); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipListSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipListSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipListSelfTest.java index 52512cf..8a046e0 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipListSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipListSelfTest.java @@ -82,7 +82,7 @@ public class HadoopSkipListSelfTest extends HadoopAbstractMapTest { int mapSize = 16 << rnd.nextInt(6); - GridHadoopJobInfo job = new JobInfo(); + HadoopJobInfo job = new JobInfo(); HadoopTaskContext taskCtx = new TaskContext(); @@ -216,7 +216,7 @@ public class HadoopSkipListSelfTest extends HadoopAbstractMapTest { Random rnd = new GridRandom(); for (int i = 0; i < 20; i++) { - GridHadoopJobInfo job = new JobInfo(); + HadoopJobInfo job = new JobInfo(); final HadoopTaskContext taskCtx = new TaskContext(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutionSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutionSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutionSelfTest.java index 3735aab..837db37 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutionSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutionSelfTest.java @@ -24,6 +24,7 @@ import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.lib.input.*; import org.apache.hadoop.mapreduce.lib.output.*; import org.apache.ignite.*; +import org.apache.ignite.configuration.*; import org.apache.ignite.igfs.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.hadoop.*; @@ -54,8 +55,8 @@ public class HadoopExternalTaskExecutionSelfTest extends HadoopAbstractSelfTest } /** {@inheritDoc} */ - @Override public GridHadoopConfiguration hadoopConfiguration(String gridName) { - GridHadoopConfiguration cfg = super.hadoopConfiguration(gridName); + @Override public HadoopConfiguration hadoopConfiguration(String gridName) { + HadoopConfiguration cfg = super.hadoopConfiguration(gridName); cfg.setExternalExecution(true); @@ -92,7 +93,7 @@ public class HadoopExternalTaskExecutionSelfTest extends HadoopAbstractSelfTest job.setJarByClass(getClass()); - IgniteInternalFuture<?> fut = grid(0).hadoop().submit(new GridHadoopJobId(UUID.randomUUID(), 1), + IgniteInternalFuture<?> fut = grid(0).hadoop().submit(new HadoopJobId(UUID.randomUUID(), 1), createJobInfo(job.getConfiguration())); fut.get(); @@ -128,7 +129,7 @@ public class HadoopExternalTaskExecutionSelfTest extends HadoopAbstractSelfTest job.setJarByClass(getClass()); - IgniteInternalFuture<?> fut = grid(0).hadoop().submit(new GridHadoopJobId(UUID.randomUUID(), 1), + IgniteInternalFuture<?> fut = grid(0).hadoop().submit(new HadoopJobId(UUID.randomUUID(), 1), createJobInfo(job.getConfiguration())); try { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/28fad185/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java index 1398886..2aa22db 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java @@ -88,7 +88,7 @@ public class IgniteHadoopTestSuite extends TestSuite { suite.addTest(new TestSuite(ldr.loadClass(HadoopHashMapSelfTest.class.getName()))); suite.addTest(new TestSuite(ldr.loadClass(HadoopDataStreamSelfTest.class.getName()))); - suite.addTest(new TestSuite(ldr.loadClass(GridHadoopConcurrentHashMultimapSelftest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(HadoopConcurrentHashMultimapSelftest.class.getName()))); suite.addTest(new TestSuite(ldr.loadClass(HadoopSkipListSelfTest.class.getName())));