IGNITE-471 - Fixed compilation
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/f28b7506 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/f28b7506 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/f28b7506 Branch: refs/heads/ignite-471 Commit: f28b7506671c19af0f8647591aecd7c9e47c1431 Parents: c08ad75 Author: Valentin Kulichenko <vkuliche...@gridgain.com> Authored: Wed Mar 11 22:27:03 2015 -0700 Committer: Valentin Kulichenko <vkuliche...@gridgain.com> Committed: Wed Mar 11 22:27:03 2015 -0700 ---------------------------------------------------------------------- .../HadoopConcurrentHashMultimapSelfTest.java | 267 +++++++++++++++++++ 1 file changed, 267 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f28b7506/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopConcurrentHashMultimapSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopConcurrentHashMultimapSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopConcurrentHashMultimapSelfTest.java new file mode 100644 index 0000000..b527a79 --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopConcurrentHashMultimapSelfTest.java @@ -0,0 +1,267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.shuffle.collections; + +import com.google.common.collect.*; +import org.apache.hadoop.io.*; +import org.apache.ignite.internal.processors.hadoop.*; +import org.apache.ignite.internal.util.*; +import org.apache.ignite.internal.util.io.*; +import org.apache.ignite.internal.util.offheap.unsafe.*; +import org.apache.ignite.internal.util.typedef.*; + +import java.io.*; +import java.util.*; +import java.util.concurrent.*; + +import static org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory.*; + +/** + * + */ +public class HadoopConcurrentHashMultimapSelfTest extends HadoopAbstractMapTest { + /** */ + public void testMapSimple() throws Exception { + GridUnsafeMemory mem = new GridUnsafeMemory(0); + +// mem.listen(new GridOffHeapEventListener() { +// @Override public void onEvent(GridOffHeapEvent evt) { +// if (evt == GridOffHeapEvent.ALLOCATE) +// U.dumpStack(); +// } +// }); + + Random rnd = new Random(); + + int mapSize = 16 << rnd.nextInt(3); + + HadoopJobInfo job = new JobInfo(); + + HadoopTaskContext taskCtx = new TaskContext(); + + HadoopConcurrentHashMultimap m = new HadoopConcurrentHashMultimap(job, mem, mapSize); + + HadoopConcurrentHashMultimap.Adder a = m.startAdding(taskCtx); + + Multimap<Integer, Integer> mm = ArrayListMultimap.create(); + Multimap<Integer, Integer> vis = ArrayListMultimap.create(); + + for (int i = 0, vals = 4 * mapSize + rnd.nextInt(25); i < vals; i++) { + int key = rnd.nextInt(mapSize); + int val = rnd.nextInt(); + + a.write(new IntWritable(key), new IntWritable(val)); + mm.put(key, val); + + X.println("k: " + key + " v: " + val); + + a.close(); + + check(m, mm, vis, taskCtx); + + a = m.startAdding(taskCtx); + } + +// a.add(new IntWritable(10), new IntWritable(2)); +// mm.put(10, 2); +// check(m, mm); + + a.close(); + + X.println("Alloc: " + mem.allocatedSize()); + + m.close(); + + assertEquals(0, mem.allocatedSize()); + } + + private void check(HadoopConcurrentHashMultimap m, Multimap<Integer, Integer> mm, + final Multimap<Integer, Integer> vis, HadoopTaskContext taskCtx) throws Exception { + final HadoopTaskInput in = m.input(taskCtx); + + Map<Integer, Collection<Integer>> mmm = mm.asMap(); + + int keys = 0; + + while (in.next()) { + keys++; + + IntWritable k = (IntWritable)in.key(); + + assertNotNull(k); + + Deque<Integer> vs = new LinkedList<>(); + + Iterator<?> it = in.values(); + + while (it.hasNext()) + vs.addFirst(((IntWritable) it.next()).get()); + + Collection<Integer> exp = mmm.get(k.get()); + + assertEquals(exp, vs); + } + + assertEquals(mmm.size(), keys); + + assertEquals(m.keys(), keys); + + X.println("keys: " + keys + " cap: " + m.capacity()); + + // Check visitor. + + final byte[] buf = new byte[4]; + + final GridDataInput dataInput = new GridUnsafeDataInput(); + + m.visit(false, new HadoopConcurrentHashMultimap.Visitor() { + /** */ + IntWritable key = new IntWritable(); + + /** */ + IntWritable val = new IntWritable(); + + @Override public void onKey(long keyPtr, int keySize) { + read(keyPtr, keySize, key); + } + + @Override public void onValue(long valPtr, int valSize) { + read(valPtr, valSize, val); + + vis.put(key.get(), val.get()); + } + + private void read(long ptr, int size, Writable w) { + assert size == 4 : size; + + UNSAFE.copyMemory(null, ptr, buf, BYTE_ARR_OFF, size); + + dataInput.bytes(buf, 0, size); + + try { + w.readFields(dataInput); + } + catch (IOException e) { + throw new RuntimeException(e); + } + } + }); + +// X.println("vis: " + vis); + + assertEquals(mm, vis); + + in.close(); + } + + /** + * @throws Exception if failed. + */ + public void testMultiThreaded() throws Exception { + GridUnsafeMemory mem = new GridUnsafeMemory(0); + + X.println("___ Started"); + + Random rnd = new GridRandom(); + + for (int i = 0; i < 20; i++) { + HadoopJobInfo job = new JobInfo(); + + final HadoopTaskContext taskCtx = new TaskContext(); + + final HadoopConcurrentHashMultimap m = new HadoopConcurrentHashMultimap(job, mem, 16); + + final ConcurrentMap<Integer, Collection<Integer>> mm = new ConcurrentHashMap<>(); + + X.println("___ MT"); + + multithreaded(new Callable<Object>() { + @Override public Object call() throws Exception { + X.println("___ TH in"); + + Random rnd = new GridRandom(); + + IntWritable key = new IntWritable(); + IntWritable val = new IntWritable(); + + HadoopMultimap.Adder a = m.startAdding(taskCtx); + + for (int i = 0; i < 50000; i++) { + int k = rnd.nextInt(32000); + int v = rnd.nextInt(); + + key.set(k); + val.set(v); + + a.write(key, val); + + Collection<Integer> list = mm.get(k); + + if (list == null) { + list = new ConcurrentLinkedQueue<>(); + + Collection<Integer> old = mm.putIfAbsent(k, list); + + if (old != null) + list = old; + } + + list.add(v); + } + + a.close(); + + X.println("___ TH out"); + + return null; + } + }, 3 + rnd.nextInt(27)); + + X.println("___ Check: " + m.capacity()); + + assertEquals(mm.size(), m.keys()); + + assertTrue(m.capacity() > 32000); + + HadoopTaskInput in = m.input(taskCtx); + + while (in.next()) { + IntWritable key = (IntWritable) in.key(); + + Iterator<?> valsIter = in.values(); + + Collection<Integer> vals = mm.remove(key.get()); + + assertNotNull(vals); + + while (valsIter.hasNext()) { + IntWritable val = (IntWritable) valsIter.next(); + + assertTrue(vals.remove(val.get())); + } + + assertTrue(vals.isEmpty()); + } + + in.close(); + m.close(); + + assertEquals(0, mem.allocatedSize()); + } + } +}