IGNITE-471 - Fixed compilation

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/f28b7506
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/f28b7506
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/f28b7506

Branch: refs/heads/ignite-471
Commit: f28b7506671c19af0f8647591aecd7c9e47c1431
Parents: c08ad75
Author: Valentin Kulichenko <vkuliche...@gridgain.com>
Authored: Wed Mar 11 22:27:03 2015 -0700
Committer: Valentin Kulichenko <vkuliche...@gridgain.com>
Committed: Wed Mar 11 22:27:03 2015 -0700

----------------------------------------------------------------------
 .../HadoopConcurrentHashMultimapSelfTest.java   | 267 +++++++++++++++++++
 1 file changed, 267 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f28b7506/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopConcurrentHashMultimapSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopConcurrentHashMultimapSelfTest.java
 
b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopConcurrentHashMultimapSelfTest.java
new file mode 100644
index 0000000..b527a79
--- /dev/null
+++ 
b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopConcurrentHashMultimapSelfTest.java
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.shuffle.collections;
+
+import com.google.common.collect.*;
+import org.apache.hadoop.io.*;
+import org.apache.ignite.internal.processors.hadoop.*;
+import org.apache.ignite.internal.util.*;
+import org.apache.ignite.internal.util.io.*;
+import org.apache.ignite.internal.util.offheap.unsafe.*;
+import org.apache.ignite.internal.util.typedef.*;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.*;
+
+import static 
org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory.*;
+
+/**
+ *
+ */
+public class HadoopConcurrentHashMultimapSelfTest extends 
HadoopAbstractMapTest {
+    /** */
+    public void testMapSimple() throws Exception {
+        GridUnsafeMemory mem = new GridUnsafeMemory(0);
+
+//        mem.listen(new GridOffHeapEventListener() {
+//            @Override public void onEvent(GridOffHeapEvent evt) {
+//                if (evt == GridOffHeapEvent.ALLOCATE)
+//                    U.dumpStack();
+//            }
+//        });
+
+        Random rnd = new Random();
+
+        int mapSize = 16 << rnd.nextInt(3);
+
+        HadoopJobInfo job = new JobInfo();
+
+        HadoopTaskContext taskCtx = new TaskContext();
+
+        HadoopConcurrentHashMultimap m = new HadoopConcurrentHashMultimap(job, 
mem, mapSize);
+
+        HadoopConcurrentHashMultimap.Adder a = m.startAdding(taskCtx);
+
+        Multimap<Integer, Integer> mm = ArrayListMultimap.create();
+        Multimap<Integer, Integer> vis = ArrayListMultimap.create();
+
+        for (int i = 0, vals = 4 * mapSize + rnd.nextInt(25); i < vals; i++) {
+            int key = rnd.nextInt(mapSize);
+            int val = rnd.nextInt();
+
+            a.write(new IntWritable(key), new IntWritable(val));
+            mm.put(key, val);
+
+            X.println("k: " + key + " v: " + val);
+
+            a.close();
+
+            check(m, mm, vis, taskCtx);
+
+            a = m.startAdding(taskCtx);
+        }
+
+//        a.add(new IntWritable(10), new IntWritable(2));
+//        mm.put(10, 2);
+//        check(m, mm);
+
+        a.close();
+
+        X.println("Alloc: " + mem.allocatedSize());
+
+        m.close();
+
+        assertEquals(0, mem.allocatedSize());
+    }
+
+    private void check(HadoopConcurrentHashMultimap m, Multimap<Integer, 
Integer> mm,
+        final Multimap<Integer, Integer> vis, HadoopTaskContext taskCtx) 
throws Exception {
+        final HadoopTaskInput in = m.input(taskCtx);
+
+        Map<Integer, Collection<Integer>> mmm = mm.asMap();
+
+        int keys = 0;
+
+        while (in.next()) {
+            keys++;
+
+            IntWritable k = (IntWritable)in.key();
+
+            assertNotNull(k);
+
+            Deque<Integer> vs = new LinkedList<>();
+
+            Iterator<?> it = in.values();
+
+            while (it.hasNext())
+                vs.addFirst(((IntWritable) it.next()).get());
+
+            Collection<Integer> exp = mmm.get(k.get());
+
+            assertEquals(exp, vs);
+        }
+
+        assertEquals(mmm.size(), keys);
+
+        assertEquals(m.keys(), keys);
+
+        X.println("keys: " + keys + " cap: " + m.capacity());
+
+        // Check visitor.
+
+        final byte[] buf = new byte[4];
+
+        final GridDataInput dataInput = new GridUnsafeDataInput();
+
+        m.visit(false, new HadoopConcurrentHashMultimap.Visitor() {
+            /** */
+            IntWritable key = new IntWritable();
+
+            /** */
+            IntWritable val = new IntWritable();
+
+            @Override public void onKey(long keyPtr, int keySize) {
+                read(keyPtr, keySize, key);
+            }
+
+            @Override public void onValue(long valPtr, int valSize) {
+                read(valPtr, valSize, val);
+
+                vis.put(key.get(), val.get());
+            }
+
+            private void read(long ptr, int size, Writable w) {
+                assert size == 4 : size;
+
+                UNSAFE.copyMemory(null, ptr, buf, BYTE_ARR_OFF, size);
+
+                dataInput.bytes(buf, 0, size);
+
+                try {
+                    w.readFields(dataInput);
+                }
+                catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        });
+
+//        X.println("vis: " + vis);
+
+        assertEquals(mm, vis);
+
+        in.close();
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    public void testMultiThreaded() throws Exception {
+        GridUnsafeMemory mem = new GridUnsafeMemory(0);
+
+        X.println("___ Started");
+
+        Random rnd = new GridRandom();
+
+        for (int i = 0; i < 20; i++) {
+            HadoopJobInfo job = new JobInfo();
+
+            final HadoopTaskContext taskCtx = new TaskContext();
+
+            final HadoopConcurrentHashMultimap m = new 
HadoopConcurrentHashMultimap(job, mem, 16);
+
+            final ConcurrentMap<Integer, Collection<Integer>> mm = new 
ConcurrentHashMap<>();
+
+            X.println("___ MT");
+
+            multithreaded(new Callable<Object>() {
+                @Override public Object call() throws Exception {
+                    X.println("___ TH in");
+
+                    Random rnd = new GridRandom();
+
+                    IntWritable key = new IntWritable();
+                    IntWritable val = new IntWritable();
+
+                    HadoopMultimap.Adder a = m.startAdding(taskCtx);
+
+                    for (int i = 0; i < 50000; i++) {
+                        int k = rnd.nextInt(32000);
+                        int v = rnd.nextInt();
+
+                        key.set(k);
+                        val.set(v);
+
+                        a.write(key, val);
+
+                        Collection<Integer> list = mm.get(k);
+
+                        if (list == null) {
+                            list = new ConcurrentLinkedQueue<>();
+
+                            Collection<Integer> old = mm.putIfAbsent(k, list);
+
+                            if (old != null)
+                                list = old;
+                        }
+
+                        list.add(v);
+                    }
+
+                    a.close();
+
+                    X.println("___ TH out");
+
+                    return null;
+                }
+            }, 3 + rnd.nextInt(27));
+
+            X.println("___ Check: " + m.capacity());
+
+            assertEquals(mm.size(), m.keys());
+
+            assertTrue(m.capacity() > 32000);
+
+            HadoopTaskInput in = m.input(taskCtx);
+
+            while (in.next()) {
+                IntWritable key = (IntWritable) in.key();
+
+                Iterator<?> valsIter = in.values();
+
+                Collection<Integer> vals = mm.remove(key.get());
+
+                assertNotNull(vals);
+
+                while (valsIter.hasNext()) {
+                    IntWritable val = (IntWritable) valsIter.next();
+
+                    assertTrue(vals.remove(val.get()));
+                }
+
+                assertTrue(vals.isEmpty());
+            }
+
+            in.close();
+            m.close();
+
+            assertEquals(0, mem.allocatedSize());
+        }
+    }
+}

Reply via email to