Repository: incubator-ignite Updated Branches: refs/heads/ignite-471 5a7254639 -> f28b75066
IGNITE-471 - Fixed compilation Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/c08ad75d Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/c08ad75d Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/c08ad75d Branch: refs/heads/ignite-471 Commit: c08ad75dc02cfef93e781a62aa5cc8f56e6b8f59 Parents: 5a72546 Author: Valentin Kulichenko <vkuliche...@gridgain.com> Authored: Wed Mar 11 22:26:27 2015 -0700 Committer: Valentin Kulichenko <vkuliche...@gridgain.com> Committed: Wed Mar 11 22:26:27 2015 -0700 ---------------------------------------------------------------------- .../HadoopConcurrentHashMultimapSelftest.java | 267 ------------------- 1 file changed, 267 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c08ad75d/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopConcurrentHashMultimapSelftest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopConcurrentHashMultimapSelftest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopConcurrentHashMultimapSelftest.java deleted file mode 100644 index b527a79..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopConcurrentHashMultimapSelftest.java +++ /dev/null @@ -1,267 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.shuffle.collections; - -import com.google.common.collect.*; -import org.apache.hadoop.io.*; -import org.apache.ignite.internal.processors.hadoop.*; -import org.apache.ignite.internal.util.*; -import org.apache.ignite.internal.util.io.*; -import org.apache.ignite.internal.util.offheap.unsafe.*; -import org.apache.ignite.internal.util.typedef.*; - -import java.io.*; -import java.util.*; -import java.util.concurrent.*; - -import static org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory.*; - -/** - * - */ -public class HadoopConcurrentHashMultimapSelfTest extends HadoopAbstractMapTest { - /** */ - public void testMapSimple() throws Exception { - GridUnsafeMemory mem = new GridUnsafeMemory(0); - -// mem.listen(new GridOffHeapEventListener() { -// @Override public void onEvent(GridOffHeapEvent evt) { -// if (evt == GridOffHeapEvent.ALLOCATE) -// U.dumpStack(); -// } -// }); - - Random rnd = new Random(); - - int mapSize = 16 << rnd.nextInt(3); - - HadoopJobInfo job = new JobInfo(); - - HadoopTaskContext taskCtx = new TaskContext(); - - HadoopConcurrentHashMultimap m = new HadoopConcurrentHashMultimap(job, mem, mapSize); - - HadoopConcurrentHashMultimap.Adder a = m.startAdding(taskCtx); - - Multimap<Integer, Integer> mm = ArrayListMultimap.create(); - Multimap<Integer, Integer> vis = ArrayListMultimap.create(); - - for (int i = 0, vals = 4 * mapSize + rnd.nextInt(25); i < vals; i++) { - int key = rnd.nextInt(mapSize); - int val = rnd.nextInt(); - - a.write(new IntWritable(key), new IntWritable(val)); - mm.put(key, val); - - X.println("k: " + key + " v: " + val); - - a.close(); - - check(m, mm, vis, taskCtx); - - a = m.startAdding(taskCtx); - } - -// a.add(new IntWritable(10), new IntWritable(2)); -// mm.put(10, 2); -// check(m, mm); - - a.close(); - - X.println("Alloc: " + mem.allocatedSize()); - - m.close(); - - assertEquals(0, mem.allocatedSize()); - } - - private void check(HadoopConcurrentHashMultimap m, Multimap<Integer, Integer> mm, - final Multimap<Integer, Integer> vis, HadoopTaskContext taskCtx) throws Exception { - final HadoopTaskInput in = m.input(taskCtx); - - Map<Integer, Collection<Integer>> mmm = mm.asMap(); - - int keys = 0; - - while (in.next()) { - keys++; - - IntWritable k = (IntWritable)in.key(); - - assertNotNull(k); - - Deque<Integer> vs = new LinkedList<>(); - - Iterator<?> it = in.values(); - - while (it.hasNext()) - vs.addFirst(((IntWritable) it.next()).get()); - - Collection<Integer> exp = mmm.get(k.get()); - - assertEquals(exp, vs); - } - - assertEquals(mmm.size(), keys); - - assertEquals(m.keys(), keys); - - X.println("keys: " + keys + " cap: " + m.capacity()); - - // Check visitor. - - final byte[] buf = new byte[4]; - - final GridDataInput dataInput = new GridUnsafeDataInput(); - - m.visit(false, new HadoopConcurrentHashMultimap.Visitor() { - /** */ - IntWritable key = new IntWritable(); - - /** */ - IntWritable val = new IntWritable(); - - @Override public void onKey(long keyPtr, int keySize) { - read(keyPtr, keySize, key); - } - - @Override public void onValue(long valPtr, int valSize) { - read(valPtr, valSize, val); - - vis.put(key.get(), val.get()); - } - - private void read(long ptr, int size, Writable w) { - assert size == 4 : size; - - UNSAFE.copyMemory(null, ptr, buf, BYTE_ARR_OFF, size); - - dataInput.bytes(buf, 0, size); - - try { - w.readFields(dataInput); - } - catch (IOException e) { - throw new RuntimeException(e); - } - } - }); - -// X.println("vis: " + vis); - - assertEquals(mm, vis); - - in.close(); - } - - /** - * @throws Exception if failed. - */ - public void testMultiThreaded() throws Exception { - GridUnsafeMemory mem = new GridUnsafeMemory(0); - - X.println("___ Started"); - - Random rnd = new GridRandom(); - - for (int i = 0; i < 20; i++) { - HadoopJobInfo job = new JobInfo(); - - final HadoopTaskContext taskCtx = new TaskContext(); - - final HadoopConcurrentHashMultimap m = new HadoopConcurrentHashMultimap(job, mem, 16); - - final ConcurrentMap<Integer, Collection<Integer>> mm = new ConcurrentHashMap<>(); - - X.println("___ MT"); - - multithreaded(new Callable<Object>() { - @Override public Object call() throws Exception { - X.println("___ TH in"); - - Random rnd = new GridRandom(); - - IntWritable key = new IntWritable(); - IntWritable val = new IntWritable(); - - HadoopMultimap.Adder a = m.startAdding(taskCtx); - - for (int i = 0; i < 50000; i++) { - int k = rnd.nextInt(32000); - int v = rnd.nextInt(); - - key.set(k); - val.set(v); - - a.write(key, val); - - Collection<Integer> list = mm.get(k); - - if (list == null) { - list = new ConcurrentLinkedQueue<>(); - - Collection<Integer> old = mm.putIfAbsent(k, list); - - if (old != null) - list = old; - } - - list.add(v); - } - - a.close(); - - X.println("___ TH out"); - - return null; - } - }, 3 + rnd.nextInt(27)); - - X.println("___ Check: " + m.capacity()); - - assertEquals(mm.size(), m.keys()); - - assertTrue(m.capacity() > 32000); - - HadoopTaskInput in = m.input(taskCtx); - - while (in.next()) { - IntWritable key = (IntWritable) in.key(); - - Iterator<?> valsIter = in.values(); - - Collection<Integer> vals = mm.remove(key.get()); - - assertNotNull(vals); - - while (valsIter.hasNext()) { - IntWritable val = (IntWritable) valsIter.next(); - - assertTrue(vals.remove(val.get())); - } - - assertTrue(vals.isEmpty()); - } - - in.close(); - m.close(); - - assertEquals(0, mem.allocatedSize()); - } - } -}