http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopSkipList.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopSkipList.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopSkipList.java deleted file mode 100644 index a2c626c..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopSkipList.java +++ /dev/null @@ -1,726 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.shuffle.collections; - -import org.apache.ignite.*; -import org.apache.ignite.internal.processors.hadoop.*; -import org.apache.ignite.internal.util.*; -import org.apache.ignite.internal.util.offheap.unsafe.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.util.*; -import java.util.concurrent.atomic.*; - -/** - * Skip list. - */ -public class GridHadoopSkipList extends GridHadoopMultimapBase { - /** */ - private static final int HEADS_SIZE = 24 + 33 * 8; // Offset + max level is from 0 to 32 inclusive. - - /** Top level. */ - private final AtomicInteger topLevel = new AtomicInteger(-1); - - /** Heads for all the lists. */ - private final long heads; - - /** */ - private final AtomicBoolean visitGuard = new AtomicBoolean(); - - /** - * @param jobInfo Job info. - * @param mem Memory. - */ - public GridHadoopSkipList(GridHadoopJobInfo jobInfo, GridUnsafeMemory mem) { - super(jobInfo, mem); - - heads = mem.allocate(HEADS_SIZE, true); - } - - /** {@inheritDoc} */ - @Override public void close() { - super.close(); - - mem.release(heads, HEADS_SIZE); - } - - /** {@inheritDoc} */ - @Override public boolean visit(boolean ignoreLastVisited, Visitor v) throws IgniteCheckedException { - if (!visitGuard.compareAndSet(false, true)) - return false; - - for (long meta = nextMeta(heads, 0); meta != 0L; meta = nextMeta(meta, 0)) { - long valPtr = value(meta); - - long lastVisited = ignoreLastVisited ? 0 : lastVisitedValue(meta); - - if (valPtr != lastVisited) { - long k = key(meta); - - v.onKey(k + 4, keySize(k)); - - lastVisitedValue(meta, valPtr); // Set it to the first value in chain. - - do { - v.onValue(valPtr + 12, valueSize(valPtr)); - - valPtr = nextValue(valPtr); - } - while (valPtr != lastVisited); - } - } - - visitGuard.lazySet(false); - - return true; - } - - /** {@inheritDoc} */ - @Override public Adder startAdding(GridHadoopTaskContext ctx) throws IgniteCheckedException { - return new AdderImpl(ctx); - } - - /** {@inheritDoc} */ - @Override public GridHadoopTaskInput input(GridHadoopTaskContext taskCtx) throws IgniteCheckedException { - Input in = new Input(taskCtx); - - Comparator<Object> grpCmp = taskCtx.groupComparator(); - - if (grpCmp != null) - return new GroupedInput(grpCmp, in); - - return in; - } - - /** - * @param meta Meta pointer. - * @return Key pointer. - */ - private long key(long meta) { - return mem.readLong(meta); - } - - /** - * @param meta Meta pointer. - * @param key Key pointer. - */ - private void key(long meta, long key) { - mem.writeLong(meta, key); - } - - /** - * @param meta Meta pointer. - * @return Value pointer. - */ - private long value(long meta) { - return mem.readLongVolatile(meta + 8); - } - - /** - * @param meta Meta pointer. - * @param valPtr Value pointer. - */ - private void value(long meta, long valPtr) { - mem.writeLongVolatile(meta + 8, valPtr); - } - - /** - * @param meta Meta pointer. - * @param oldValPtr Old first value pointer. - * @param newValPtr New first value pointer. - * @return {@code true} If operation succeeded. - */ - private boolean casValue(long meta, long oldValPtr, long newValPtr) { - return mem.casLong(meta + 8, oldValPtr, newValPtr); - } - - /** - * @param meta Meta pointer. - * @return Last visited value pointer. - */ - private long lastVisitedValue(long meta) { - return mem.readLong(meta + 16); - } - - /** - * @param meta Meta pointer. - * @param valPtr Last visited value pointer. - */ - private void lastVisitedValue(long meta, long valPtr) { - mem.writeLong(meta + 16, valPtr); - } - - /** - * @param meta Meta pointer. - * @param level Level. - * @return Next meta pointer. - */ - private long nextMeta(long meta, int level) { - assert meta > 0 : meta; - - return mem.readLongVolatile(meta + 24 + 8 * level); - } - - /** - * @param meta Meta pointer. - * @param level Level. - * @param oldNext Old next meta pointer. - * @param newNext New next meta pointer. - * @return {@code true} If operation succeeded. - */ - private boolean casNextMeta(long meta, int level, long oldNext, long newNext) { - assert meta > 0 : meta; - - return mem.casLong(meta + 24 + 8 * level, oldNext, newNext); - } - - /** - * @param meta Meta pointer. - * @param level Level. - * @param nextMeta Next meta. - */ - private void nextMeta(long meta, int level, long nextMeta) { - assert meta != 0; - - mem.writeLong(meta + 24 + 8 * level, nextMeta); - } - - /** - * @param keyPtr Key pointer. - * @return Key size. - */ - private int keySize(long keyPtr) { - return mem.readInt(keyPtr); - } - - /** - * @param keyPtr Key pointer. - * @param keySize Key size. - */ - private void keySize(long keyPtr, int keySize) { - mem.writeInt(keyPtr, keySize); - } - - /** - * @param rnd Random. - * @return Next level. - */ - public static int randomLevel(Random rnd) { - int x = rnd.nextInt(); - - int level = 0; - - while ((x & 1) != 0) { // Count sequential 1 bits. - level++; - - x >>>= 1; - } - - return level; - } - - /** - * Reader. - */ - private class Reader extends ReaderBase { - /** - * @param ser Serialization. - */ - protected Reader(GridHadoopSerialization ser) { - super(ser); - } - - /** - * @param meta Meta pointer. - * @return Key. - */ - public Object readKey(long meta) { - assert meta > 0 : meta; - - long k = key(meta); - - try { - return read(k + 4, keySize(k)); - } - catch (IgniteCheckedException e) { - throw new IgniteException(e); - } - } - } - - /** - * Adder. - */ - private class AdderImpl extends AdderBase { - /** */ - private final Comparator<Object> cmp; - - /** */ - private final Random rnd = new GridRandom(); - - /** */ - private final GridLongList stack = new GridLongList(16); - - /** */ - private final Reader keyReader; - - /** - * @param ctx Task context. - * @throws IgniteCheckedException If failed. - */ - protected AdderImpl(GridHadoopTaskContext ctx) throws IgniteCheckedException { - super(ctx); - - keyReader = new Reader(keySer); - - cmp = ctx.sortComparator(); - } - - /** {@inheritDoc} */ - @Override public void write(Object key, Object val) throws IgniteCheckedException { - A.notNull(val, "val"); - - add(key, val); - } - - /** {@inheritDoc} */ - @Override public Key addKey(DataInput in, @Nullable Key reuse) throws IgniteCheckedException { - KeyImpl k = reuse == null ? new KeyImpl() : (KeyImpl)reuse; - - k.tmpKey = keySer.read(in, k.tmpKey); - - k.meta = add(k.tmpKey, null); - - return k; - } - - /** - * @param key Key. - * @param val Value. - * @param level Level. - * @return Meta pointer. - */ - private long createMeta(long key, long val, int level) { - int size = 32 + 8 * level; - - long meta = allocate(size); - - key(meta, key); - value(meta, val); - lastVisitedValue(meta, 0L); - - for (int i = 32; i < size; i += 8) // Fill with 0. - mem.writeLong(meta + i, 0L); - - return meta; - } - - /** - * @param key Key. - * @return Pointer. - * @throws IgniteCheckedException If failed. - */ - private long writeKey(Object key) throws IgniteCheckedException { - long keyPtr = write(4, key, keySer); - int keySize = writtenSize() - 4; - - keySize(keyPtr, keySize); - - return keyPtr; - } - - /** - * @param prevMeta Previous meta. - * @param meta Next meta. - */ - private void stackPush(long prevMeta, long meta) { - stack.add(prevMeta); - stack.add(meta); - } - - /** - * Drops last remembered frame from the stack. - */ - private void stackPop() { - stack.pop(2); - } - - /** - * @param key Key. - * @param val Value. - * @return Meta pointer. - * @throws IgniteCheckedException If failed. - */ - private long add(Object key, @Nullable Object val) throws IgniteCheckedException { - assert key != null; - - stack.clear(); - - long valPtr = 0; - - if (val != null) { // Write value. - valPtr = write(12, val, valSer); - int valSize = writtenSize() - 12; - - nextValue(valPtr, 0); - valueSize(valPtr, valSize); - } - - long keyPtr = 0; - long newMeta = 0; - int newMetaLevel = -1; - - long prevMeta = heads; - int level = topLevel.get(); - long meta = level < 0 ? 0 : nextMeta(heads, level); - - for (;;) { - if (level < 0) { // We did not find our key, trying to add new meta. - if (keyPtr == 0) { // Write key and create meta only once. - keyPtr = writeKey(key); - - newMetaLevel = randomLevel(rnd); - newMeta = createMeta(keyPtr, valPtr, newMetaLevel); - } - - nextMeta(newMeta, 0, meta); // Set next to new meta before publishing. - - if (casNextMeta(prevMeta, 0, meta, newMeta)) { // New key was added successfully. - laceUp(key, newMeta, newMetaLevel); - - return newMeta; - } - else { // Add failed, need to check out what was added by another thread. - meta = nextMeta(prevMeta, level = 0); - - stackPop(); - } - } - - int cmpRes = cmp(key, meta); - - if (cmpRes == 0) { // Key found. - if (newMeta != 0) // Deallocate if we've allocated something. - localDeallocate(keyPtr); - - if (valPtr == 0) // Only key needs to be added. - return meta; - - for (;;) { // Add value for the key found. - long nextVal = value(meta); - - nextValue(valPtr, nextVal); - - if (casValue(meta, nextVal, valPtr)) - return meta; - } - } - - assert cmpRes != 0; - - if (cmpRes > 0) { // Go right. - prevMeta = meta; - meta = nextMeta(meta, level); - - if (meta != 0) // If nothing to the right then go down. - continue; - } - - while (--level >= 0) { // Go down. - stackPush(prevMeta, meta); // Remember the path. - - long nextMeta = nextMeta(prevMeta, level); - - if (nextMeta != meta) { // If the meta is the same as on upper level go deeper. - meta = nextMeta; - - assert meta != 0; - - break; - } - } - } - } - - /** - * @param key Key. - * @param meta Meta pointer. - * @return Comparison result. - */ - @SuppressWarnings("unchecked") - private int cmp(Object key, long meta) { - assert meta != 0; - - return cmp.compare(key, keyReader.readKey(meta)); - } - - /** - * Adds appropriate index links between metas. - * - * @param newMeta Just added meta. - * @param newMetaLevel New level. - */ - private void laceUp(Object key, long newMeta, int newMetaLevel) { - for (int level = 1; level <= newMetaLevel; level++) { // Go from the bottom up. - long prevMeta = heads; - long meta = 0; - - if (!stack.isEmpty()) { // Get the path back. - meta = stack.remove(); - prevMeta = stack.remove(); - } - - for (;;) { - nextMeta(newMeta, level, meta); - - if (casNextMeta(prevMeta, level, meta, newMeta)) - break; - - long oldMeta = meta; - - meta = nextMeta(prevMeta, level); // Reread meta. - - for (;;) { - int cmpRes = cmp(key, meta); - - if (cmpRes > 0) { // Go right. - prevMeta = meta; - meta = nextMeta(prevMeta, level); - - if (meta != oldMeta) // Old meta already known to be greater than ours or is 0. - continue; - } - - assert cmpRes != 0; // Two different metas with equal keys must be impossible. - - break; // Retry cas. - } - } - } - - if (!stack.isEmpty()) - return; // Our level already lower than top. - - for (;;) { // Raise top level. - int top = topLevel.get(); - - if (newMetaLevel <= top || topLevel.compareAndSet(top, newMetaLevel)) - break; - } - } - - /** - * Key. - */ - private class KeyImpl implements Key { - /** */ - private long meta; - - /** */ - private Object tmpKey; - - /** - * @return Meta pointer for the key. - */ - public long address() { - return meta; - } - - /** - * @param val Value. - */ - @Override public void add(Value val) { - int size = val.size(); - - long valPtr = allocate(size + 12); - - val.copyTo(valPtr + 12); - - valueSize(valPtr, size); - - long nextVal; - - do { - nextVal = value(meta); - - nextValue(valPtr, nextVal); - } - while(!casValue(meta, nextVal, valPtr)); - } - } - } - - /** - * Task input. - */ - private class Input implements GridHadoopTaskInput { - /** */ - private long metaPtr = heads; - - /** */ - private final Reader keyReader; - - /** */ - private final Reader valReader; - - /** - * @param taskCtx Task context. - * @throws IgniteCheckedException If failed. - */ - private Input(GridHadoopTaskContext taskCtx) throws IgniteCheckedException { - keyReader = new Reader(taskCtx.keySerialization()); - valReader = new Reader(taskCtx.valueSerialization()); - } - - /** {@inheritDoc} */ - @Override public boolean next() { - metaPtr = nextMeta(metaPtr, 0); - - return metaPtr != 0; - } - - /** {@inheritDoc} */ - @Override public Object key() { - return keyReader.readKey(metaPtr); - } - - /** {@inheritDoc} */ - @Override public Iterator<?> values() { - return new ValueIterator(value(metaPtr), valReader); - } - - /** {@inheritDoc} */ - @Override public void close() throws IgniteCheckedException { - keyReader.close(); - valReader.close(); - } - } - - /** - * Grouped input using grouping comparator. - */ - private class GroupedInput implements GridHadoopTaskInput { - /** */ - private final Comparator<Object> grpCmp; - - /** */ - private final Input in; - - /** */ - private Object prevKey; - - /** */ - private Object nextKey; - - /** */ - private final GridLongList vals = new GridLongList(); - - /** - * @param grpCmp Grouping comparator. - * @param in Input. - */ - private GroupedInput(Comparator<Object> grpCmp, Input in) { - this.grpCmp = grpCmp; - this.in = in; - } - - /** {@inheritDoc} */ - @Override public boolean next() { - if (prevKey == null) { // First call. - if (!in.next()) - return false; - - prevKey = in.key(); - - assert prevKey != null; - - in.keyReader.resetReusedObject(null); // We need 2 instances of key object for comparison. - - vals.add(value(in.metaPtr)); - } - else { - if (in.metaPtr == 0) // We reached the end of the input. - return false; - - vals.clear(); - - vals.add(value(in.metaPtr)); - - in.keyReader.resetReusedObject(prevKey); // Switch key instances. - - prevKey = nextKey; - } - - while (in.next()) { // Fill with head value pointers with equal keys. - if (grpCmp.compare(prevKey, nextKey = in.key()) == 0) - vals.add(value(in.metaPtr)); - else - break; - } - - assert !vals.isEmpty(); - - return true; - } - - /** {@inheritDoc} */ - @Override public Object key() { - return prevKey; - } - - /** {@inheritDoc} */ - @Override public Iterator<?> values() { - assert !vals.isEmpty(); - - final ValueIterator valIter = new ValueIterator(vals.get(0), in.valReader); - - return new Iterator<Object>() { - /** */ - private int idx; - - @Override public boolean hasNext() { - if (!valIter.hasNext()) { - if (++idx == vals.size()) - return false; - - valIter.head(vals.get(idx)); - - assert valIter.hasNext(); - } - - return true; - } - - @Override public Object next() { - return valIter.next(); - } - - @Override public void remove() { - valIter.remove(); - } - }; - } - - /** {@inheritDoc} */ - @Override public void close() throws IgniteCheckedException { - in.close(); - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopConcurrentHashMultimap.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopConcurrentHashMultimap.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopConcurrentHashMultimap.java new file mode 100644 index 0000000..65d9268 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopConcurrentHashMultimap.java @@ -0,0 +1,611 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.shuffle.collections; + +import org.apache.ignite.*; +import org.apache.ignite.internal.processors.hadoop.*; +import org.apache.ignite.internal.util.*; +import org.apache.ignite.internal.util.offheap.unsafe.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +/** + * Multimap for map reduce intermediate results. + */ +public class HadoopConcurrentHashMultimap extends HadoopHashMultimapBase { + /** */ + private final AtomicReference<State> state = new AtomicReference<>(State.READING_WRITING); + + /** */ + private volatile AtomicLongArray oldTbl; + + /** */ + private volatile AtomicLongArray newTbl; + + /** */ + private final AtomicInteger keys = new AtomicInteger(); + + /** */ + private final CopyOnWriteArrayList<AdderImpl> adders = new CopyOnWriteArrayList<>(); + + /** */ + private final AtomicInteger inputs = new AtomicInteger(); + + /** + * @param jobInfo Job info. + * @param mem Memory. + * @param cap Initial capacity. + */ + public HadoopConcurrentHashMultimap(HadoopJobInfo jobInfo, GridUnsafeMemory mem, int cap) { + super(jobInfo, mem); + + assert U.isPow2(cap); + + newTbl = oldTbl = new AtomicLongArray(cap); + } + + /** + * @return Number of keys. + */ + public long keys() { + int res = keys.get(); + + for (AdderImpl adder : adders) + res += adder.locKeys.get(); + + return res; + } + + /** + * @return Current table capacity. + */ + @Override public int capacity() { + return oldTbl.length(); + } + + /** + * @return Adder object. + * @param ctx Task context. + */ + @Override public Adder startAdding(HadoopTaskContext ctx) throws IgniteCheckedException { + if (inputs.get() != 0) + throw new IllegalStateException("Active inputs."); + + if (state.get() == State.CLOSING) + throw new IllegalStateException("Closed."); + + return new AdderImpl(ctx); + } + + /** {@inheritDoc} */ + @Override public void close() { + assert inputs.get() == 0 : inputs.get(); + assert adders.isEmpty() : adders.size(); + + state(State.READING_WRITING, State.CLOSING); + + if (keys() == 0) + return; + + super.close(); + } + + /** {@inheritDoc} */ + @Override protected long meta(int idx) { + return oldTbl.get(idx); + } + + /** + * Incrementally visits all the keys and values in the map. + * + * @param ignoreLastVisited Flag indicating that visiting must be started from the beginning. + * @param v Visitor. + * @return {@code false} If visiting was impossible due to rehashing. + */ + @Override public boolean visit(boolean ignoreLastVisited, Visitor v) throws IgniteCheckedException { + if (!state.compareAndSet(State.READING_WRITING, State.VISITING)) { + assert state.get() != State.CLOSING; + + return false; // Can not visit while rehashing happens. + } + + AtomicLongArray tbl0 = oldTbl; + + for (int i = 0; i < tbl0.length(); i++) { + long meta = tbl0.get(i); + + while (meta != 0) { + long valPtr = value(meta); + + long lastVisited = ignoreLastVisited ? 0 : lastVisitedValue(meta); + + if (valPtr != lastVisited) { + v.onKey(key(meta), keySize(meta)); + + lastVisitedValue(meta, valPtr); // Set it to the first value in chain. + + do { + v.onValue(valPtr + 12, valueSize(valPtr)); + + valPtr = nextValue(valPtr); + } + while (valPtr != lastVisited); + } + + meta = collision(meta); + } + } + + state(State.VISITING, State.READING_WRITING); + + return true; + } + + /** {@inheritDoc} */ + @Override public HadoopTaskInput input(HadoopTaskContext taskCtx) throws IgniteCheckedException { + inputs.incrementAndGet(); + + if (!adders.isEmpty()) + throw new IllegalStateException("Active adders."); + + State s = state.get(); + + if (s == State.CLOSING) + throw new IllegalStateException("Closed."); + + assert s != State.REHASHING; + + return new Input(taskCtx) { + @Override public void close() throws IgniteCheckedException { + if (inputs.decrementAndGet() < 0) + throw new IllegalStateException(); + + super.close(); + } + }; + } + + /** + * @param fromTbl Table. + */ + private void rehashIfNeeded(AtomicLongArray fromTbl) { + if (fromTbl.length() == Integer.MAX_VALUE) + return; + + long keys0 = keys(); + + if (keys0 < 3 * (fromTbl.length() >>> 2)) // New size has to be >= than 3/4 of capacity to rehash. + return; + + if (fromTbl != newTbl) // Check if someone else have done the job. + return; + + if (!state.compareAndSet(State.READING_WRITING, State.REHASHING)) { + assert state.get() != State.CLOSING; // Visiting is allowed, but we will not rehash. + + return; + } + + if (fromTbl != newTbl) { // Double check. + state(State.REHASHING, State.READING_WRITING); // Switch back. + + return; + } + + // Calculate new table capacity. + int newLen = fromTbl.length(); + + do { + newLen <<= 1; + } + while (newLen < keys0); + + if (keys0 >= 3 * (newLen >>> 2)) // Still more than 3/4. + newLen <<= 1; + + // This is our target table for rehashing. + AtomicLongArray toTbl = new AtomicLongArray(newLen); + + // Make the new table visible before rehashing. + newTbl = toTbl; + + // Rehash. + int newMask = newLen - 1; + + long failedMeta = 0; + + GridLongList collisions = new GridLongList(16); + + for (int i = 0; i < fromTbl.length(); i++) { // Scan source table. + long meta = fromTbl.get(i); + + assert meta != -1; + + if (meta == 0) { // No entry. + failedMeta = 0; + + if (!fromTbl.compareAndSet(i, 0, -1)) // Mark as moved. + i--; // Retry. + + continue; + } + + do { // Collect all the collisions before the last one failed to nullify or 0. + collisions.add(meta); + + meta = collision(meta); + } + while (meta != failedMeta); + + do { // Go from the last to the first to avoid 'in-flight' state for meta entries. + meta = collisions.remove(); + + int addr = keyHash(meta) & newMask; + + for (;;) { // Move meta entry to the new table. + long toCollision = toTbl.get(addr); + + collision(meta, toCollision); + + if (toTbl.compareAndSet(addr, toCollision, meta)) + break; + } + } + while (!collisions.isEmpty()); + + // Here 'meta' will be a root pointer in old table. + if (!fromTbl.compareAndSet(i, meta, -1)) { // Try to mark as moved. + failedMeta = meta; + + i--; // Retry the same address in table because new keys were added. + } + else + failedMeta = 0; + } + + // Now old and new tables will be the same again. + oldTbl = toTbl; + + state(State.REHASHING, State.READING_WRITING); + } + + /** + * Switch state. + * + * @param oldState Expected state. + * @param newState New state. + */ + private void state(State oldState, State newState) { + if (!state.compareAndSet(oldState, newState)) + throw new IllegalStateException(); + } + + /** + * @param meta Meta pointer. + * @return Value pointer. + */ + @Override protected long value(long meta) { + return mem.readLongVolatile(meta + 16); + } + + /** + * @param meta Meta pointer. + * @param oldValPtr Old value. + * @param newValPtr New value. + * @return {@code true} If succeeded. + */ + private boolean casValue(long meta, long oldValPtr, long newValPtr) { + return mem.casLong(meta + 16, oldValPtr, newValPtr); + } + + /** + * @param meta Meta pointer. + * @return Collision pointer. + */ + @Override protected long collision(long meta) { + return mem.readLongVolatile(meta + 24); + } + + /** + * @param meta Meta pointer. + * @param collision Collision pointer. + */ + @Override protected void collision(long meta, long collision) { + assert meta != collision : meta; + + mem.writeLongVolatile(meta + 24, collision); + } + + /** + * @param meta Meta pointer. + * @return Last visited value pointer. + */ + private long lastVisitedValue(long meta) { + return mem.readLong(meta + 32); + } + + /** + * @param meta Meta pointer. + * @param valPtr Last visited value pointer. + */ + private void lastVisitedValue(long meta, long valPtr) { + mem.writeLong(meta + 32, valPtr); + } + + /** + * Adder. Must not be shared between threads. + */ + private class AdderImpl extends AdderBase { + /** */ + private final Reader keyReader; + + /** */ + private final AtomicInteger locKeys = new AtomicInteger(); + + /** */ + private final Random rnd = new GridRandom(); + + /** + * @param ctx Task context. + * @throws IgniteCheckedException If failed. + */ + private AdderImpl(HadoopTaskContext ctx) throws IgniteCheckedException { + super(ctx); + + keyReader = new Reader(keySer); + + rehashIfNeeded(oldTbl); + + adders.add(this); + } + + /** + * @param in Data input. + * @param reuse Reusable key. + * @return Key. + * @throws IgniteCheckedException If failed. + */ + @Override public Key addKey(DataInput in, @Nullable Key reuse) throws IgniteCheckedException { + KeyImpl k = reuse == null ? new KeyImpl() : (KeyImpl)reuse; + + k.tmpKey = keySer.read(in, k.tmpKey); + + k.meta = add(k.tmpKey, null); + + return k; + } + + /** {@inheritDoc} */ + @Override public void write(Object key, Object val) throws IgniteCheckedException { + A.notNull(val, "val"); + + add(key, val); + } + + /** + * @param tbl Table. + */ + private void incrementKeys(AtomicLongArray tbl) { + locKeys.lazySet(locKeys.get() + 1); + + if (rnd.nextInt(tbl.length()) < 512) + rehashIfNeeded(tbl); + } + + /** + * @param keyHash Key hash. + * @param keySize Key size. + * @param keyPtr Key pointer. + * @param valPtr Value page pointer. + * @param collisionPtr Pointer to meta with hash collision. + * @param lastVisitedVal Last visited value pointer. + * @return Created meta page pointer. + */ + private long createMeta(int keyHash, int keySize, long keyPtr, long valPtr, long collisionPtr, long lastVisitedVal) { + long meta = allocate(40); + + mem.writeInt(meta, keyHash); + mem.writeInt(meta + 4, keySize); + mem.writeLong(meta + 8, keyPtr); + mem.writeLong(meta + 16, valPtr); + mem.writeLong(meta + 24, collisionPtr); + mem.writeLong(meta + 32, lastVisitedVal); + + return meta; + } + + /** + * @param key Key. + * @param val Value. + * @return Updated or created meta page pointer. + * @throws IgniteCheckedException If failed. + */ + private long add(Object key, @Nullable Object val) throws IgniteCheckedException { + AtomicLongArray tbl = oldTbl; + + int keyHash = U.hash(key.hashCode()); + + long newMetaPtr = 0; + + long valPtr = 0; + + if (val != null) { + valPtr = write(12, val, valSer); + int valSize = writtenSize() - 12; + + valueSize(valPtr, valSize); + } + + for (AtomicLongArray old = null;;) { + int addr = keyHash & (tbl.length() - 1); + + long metaPtrRoot = tbl.get(addr); // Read root meta pointer at this address. + + if (metaPtrRoot == -1) { // The cell was already moved by rehashing. + AtomicLongArray n = newTbl; // Need to read newTbl first here. + AtomicLongArray o = oldTbl; + + tbl = tbl == o ? n : o; // Trying to get the oldest table but newer than ours. + + old = null; + + continue; + } + + if (metaPtrRoot != 0) { // Not empty slot. + long metaPtr = metaPtrRoot; + + do { // Scan all the collisions. + if (keyHash(metaPtr) == keyHash && key.equals(keyReader.readKey(metaPtr))) { // Found key. + if (newMetaPtr != 0) // Deallocate new meta if one was allocated. + localDeallocate(key(newMetaPtr)); // Key was allocated first, so rewind to it's pointer. + + if (valPtr != 0) { // Add value if it exists. + long nextValPtr; + + // Values are linked to each other to a stack like structure. + // Replace the last value in meta with ours and link it as next. + do { + nextValPtr = value(metaPtr); + + nextValue(valPtr, nextValPtr); + } + while (!casValue(metaPtr, nextValPtr, valPtr)); + } + + return metaPtr; + } + + metaPtr = collision(metaPtr); + } + while (metaPtr != 0); + + // Here we did not find our key, need to check if it was moved by rehashing to the new table. + if (old == null) { // If the old table already set, then we will just try to update it. + AtomicLongArray n = newTbl; + + if (n != tbl) { // Rehashing happens, try to find the key in new table but preserve the old one. + old = tbl; + tbl = n; + + continue; + } + } + } + + if (old != null) { // We just checked new table but did not find our key as well as in the old one. + tbl = old; // Try to add new key to the old table. + + addr = keyHash & (tbl.length() - 1); + + old = null; + } + + if (newMetaPtr == 0) { // Allocate new meta page. + long keyPtr = write(0, key, keySer); + int keySize = writtenSize(); + + if (valPtr != 0) + nextValue(valPtr, 0); + + newMetaPtr = createMeta(keyHash, keySize, keyPtr, valPtr, metaPtrRoot, 0); + } + else // Update new meta with root pointer collision. + collision(newMetaPtr, metaPtrRoot); + + if (tbl.compareAndSet(addr, metaPtrRoot, newMetaPtr)) { // Try to replace root pointer with new one. + incrementKeys(tbl); + + return newMetaPtr; + } + } + } + + /** {@inheritDoc} */ + @Override public void close() throws IgniteCheckedException { + if (!adders.remove(this)) + throw new IllegalStateException(); + + keys.addAndGet(locKeys.get()); // Here we have race and #keys() method can return wrong result but it is ok. + + super.close(); + } + + /** + * Key. + */ + private class KeyImpl implements Key { + /** */ + private long meta; + + /** */ + private Object tmpKey; + + /** + * @return Meta pointer for the key. + */ + public long address() { + return meta; + } + + /** + * @param val Value. + */ + @Override public void add(Value val) { + int size = val.size(); + + long valPtr = allocate(size + 12); + + val.copyTo(valPtr + 12); + + valueSize(valPtr, size); + + long nextVal; + + do { + nextVal = value(meta); + + nextValue(valPtr, nextVal); + } + while(!casValue(meta, nextVal, valPtr)); + } + } + } + + /** + * Current map state. + */ + private enum State { + /** */ + REHASHING, + + /** */ + VISITING, + + /** */ + READING_WRITING, + + /** */ + CLOSING + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopHashMultimap.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopHashMultimap.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopHashMultimap.java new file mode 100644 index 0000000..f524bdc --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopHashMultimap.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.shuffle.collections; + +import org.apache.ignite.*; +import org.apache.ignite.internal.processors.hadoop.*; +import org.apache.ignite.internal.util.offheap.unsafe.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +/** + * Hash multimap. + */ +public class HadoopHashMultimap extends HadoopHashMultimapBase { + /** */ + private long[] tbl; + + /** */ + private int keys; + + /** + * @param jobInfo Job info. + * @param mem Memory. + * @param cap Initial capacity. + */ + public HadoopHashMultimap(HadoopJobInfo jobInfo, GridUnsafeMemory mem, int cap) { + super(jobInfo, mem); + + assert U.isPow2(cap) : cap; + + tbl = new long[cap]; + } + + /** {@inheritDoc} */ + @Override public Adder startAdding(HadoopTaskContext ctx) throws IgniteCheckedException { + return new AdderImpl(ctx); + } + + /** + * Rehash. + */ + private void rehash() { + long[] newTbl = new long[tbl.length << 1]; + + int newMask = newTbl.length - 1; + + for (long meta : tbl) { + while (meta != 0) { + long collision = collision(meta); + + int idx = keyHash(meta) & newMask; + + collision(meta, newTbl[idx]); + + newTbl[idx] = meta; + + meta = collision; + } + } + + tbl = newTbl; + } + + /** + * @return Keys count. + */ + public int keys() { + return keys; + } + + /** {@inheritDoc} */ + @Override public int capacity() { + return tbl.length; + } + + /** {@inheritDoc} */ + @Override protected long meta(int idx) { + return tbl[idx]; + } + + /** + * Adder. + */ + private class AdderImpl extends AdderBase { + /** */ + private final Reader keyReader; + + /** + * @param ctx Task context. + * @throws IgniteCheckedException If failed. + */ + protected AdderImpl(HadoopTaskContext ctx) throws IgniteCheckedException { + super(ctx); + + keyReader = new Reader(keySer); + } + + /** + * @param keyHash Key hash. + * @param keySize Key size. + * @param keyPtr Key pointer. + * @param valPtr Value page pointer. + * @param collisionPtr Pointer to meta with hash collision. + * @return Created meta page pointer. + */ + private long createMeta(int keyHash, int keySize, long keyPtr, long valPtr, long collisionPtr) { + long meta = allocate(32); + + mem.writeInt(meta, keyHash); + mem.writeInt(meta + 4, keySize); + mem.writeLong(meta + 8, keyPtr); + mem.writeLong(meta + 16, valPtr); + mem.writeLong(meta + 24, collisionPtr); + + return meta; + } + + /** {@inheritDoc} */ + @Override public void write(Object key, Object val) throws IgniteCheckedException { + A.notNull(val, "val"); + + int keyHash = U.hash(key.hashCode()); + + // Write value. + long valPtr = write(12, val, valSer); + int valSize = writtenSize() - 12; + + valueSize(valPtr, valSize); + + // Find position in table. + int idx = keyHash & (tbl.length - 1); + + long meta = tbl[idx]; + + // Search for our key in collisions. + while (meta != 0) { + if (keyHash(meta) == keyHash && key.equals(keyReader.readKey(meta))) { // Found key. + nextValue(valPtr, value(meta)); + + value(meta, valPtr); + + return; + } + + meta = collision(meta); + } + + // Write key. + long keyPtr = write(0, key, keySer); + int keySize = writtenSize(); + + nextValue(valPtr, 0); + + tbl[idx] = createMeta(keyHash, keySize, keyPtr, valPtr, tbl[idx]); + + if (++keys > (tbl.length >>> 2) * 3) + rehash(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopHashMultimapBase.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopHashMultimapBase.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopHashMultimapBase.java new file mode 100644 index 0000000..16aa673 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopHashMultimapBase.java @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.shuffle.collections; + +import org.apache.ignite.*; +import org.apache.ignite.internal.processors.hadoop.*; +import org.apache.ignite.internal.util.offheap.unsafe.*; + +import java.util.*; + +/** + * Base class for hash multimaps. + */ +public abstract class HadoopHashMultimapBase extends HadoopMultimapBase { + /** + * @param jobInfo Job info. + * @param mem Memory. + */ + protected HadoopHashMultimapBase(HadoopJobInfo jobInfo, GridUnsafeMemory mem) { + super(jobInfo, mem); + } + + /** {@inheritDoc} */ + @Override public boolean visit(boolean ignoreLastVisited, Visitor v) throws IgniteCheckedException { + throw new UnsupportedOperationException("visit"); + } + + /** {@inheritDoc} */ + @Override public HadoopTaskInput input(HadoopTaskContext taskCtx) throws IgniteCheckedException { + return new Input(taskCtx); + } + + /** + * @return Hash table capacity. + */ + public abstract int capacity(); + + /** + * @param idx Index in hash table. + * @return Meta page pointer. + */ + protected abstract long meta(int idx); + + /** + * @param meta Meta pointer. + * @return Key hash. + */ + protected int keyHash(long meta) { + return mem.readInt(meta); + } + + /** + * @param meta Meta pointer. + * @return Key size. + */ + protected int keySize(long meta) { + return mem.readInt(meta + 4); + } + + /** + * @param meta Meta pointer. + * @return Key pointer. + */ + protected long key(long meta) { + return mem.readLong(meta + 8); + } + + /** + * @param meta Meta pointer. + * @return Value pointer. + */ + protected long value(long meta) { + return mem.readLong(meta + 16); + } + /** + * @param meta Meta pointer. + * @param val Value pointer. + */ + protected void value(long meta, long val) { + mem.writeLong(meta + 16, val); + } + + /** + * @param meta Meta pointer. + * @return Collision pointer. + */ + protected long collision(long meta) { + return mem.readLong(meta + 24); + } + + /** + * @param meta Meta pointer. + * @param collision Collision pointer. + */ + protected void collision(long meta, long collision) { + assert meta != collision : meta; + + mem.writeLong(meta + 24, collision); + } + + /** + * Reader for key and value. + */ + protected class Reader extends ReaderBase { + /** + * @param ser Serialization. + */ + protected Reader(HadoopSerialization ser) { + super(ser); + } + + /** + * @param meta Meta pointer. + * @return Key. + */ + public Object readKey(long meta) { + assert meta > 0 : meta; + + try { + return read(key(meta), keySize(meta)); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + } + + /** + * Task input. + */ + protected class Input implements HadoopTaskInput { + /** */ + private int idx = -1; + + /** */ + private long metaPtr; + + /** */ + private final int cap; + + /** */ + private final Reader keyReader; + + /** */ + private final Reader valReader; + + /** + * @param taskCtx Task context. + * @throws IgniteCheckedException If failed. + */ + public Input(HadoopTaskContext taskCtx) throws IgniteCheckedException { + cap = capacity(); + + keyReader = new Reader(taskCtx.keySerialization()); + valReader = new Reader(taskCtx.valueSerialization()); + } + + /** {@inheritDoc} */ + @Override public boolean next() { + if (metaPtr != 0) { + metaPtr = collision(metaPtr); + + if (metaPtr != 0) + return true; + } + + while (++idx < cap) { // Scan table. + metaPtr = meta(idx); + + if (metaPtr != 0) + return true; + } + + return false; + } + + /** {@inheritDoc} */ + @Override public Object key() { + return keyReader.readKey(metaPtr); + } + + /** {@inheritDoc} */ + @Override public Iterator<?> values() { + return new ValueIterator(value(metaPtr), valReader); + } + + /** {@inheritDoc} */ + @Override public void close() throws IgniteCheckedException { + keyReader.close(); + valReader.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimap.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimap.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimap.java new file mode 100644 index 0000000..5def6d3 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimap.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.shuffle.collections; + +import org.apache.ignite.*; +import org.apache.ignite.internal.processors.hadoop.*; +import org.jetbrains.annotations.*; + +import java.io.*; + +/** + * Multimap for hadoop intermediate results. + */ +@SuppressWarnings("PublicInnerClass") +public interface HadoopMultimap extends AutoCloseable { + /** + * Incrementally visits all the keys and values in the map. + * + * @param ignoreLastVisited Flag indicating that visiting must be started from the beginning. + * @param v Visitor. + * @return {@code false} If visiting was impossible. + */ + public boolean visit(boolean ignoreLastVisited, Visitor v) throws IgniteCheckedException; + + /** + * @param ctx Task context. + * @return Adder. + * @throws IgniteCheckedException If failed. + */ + public Adder startAdding(HadoopTaskContext ctx) throws IgniteCheckedException; + + /** + * @param taskCtx Task context. + * @return Task input. + * @throws IgniteCheckedException If failed. + */ + public HadoopTaskInput input(HadoopTaskContext taskCtx) + throws IgniteCheckedException; + + /** {@inheritDoc} */ + @Override public void close(); + + /** + * Adder. + */ + public interface Adder extends HadoopTaskOutput { + /** + * @param in Data input. + * @param reuse Reusable key. + * @return Key. + * @throws IgniteCheckedException If failed. + */ + public Key addKey(DataInput in, @Nullable Key reuse) throws IgniteCheckedException; + } + + /** + * Key add values to. + */ + public interface Key { + /** + * @param val Value. + */ + public void add(Value val); + } + + /** + * Value. + */ + public interface Value { + /** + * @return Size in bytes. + */ + public int size(); + + /** + * @param ptr Pointer. + */ + public void copyTo(long ptr); + } + + /** + * Key and values visitor. + */ + public interface Visitor { + /** + * @param keyPtr Key pointer. + * @param keySize Key size. + */ + public void onKey(long keyPtr, int keySize) throws IgniteCheckedException; + + /** + * @param valPtr Value pointer. + * @param valSize Value size. + */ + public void onValue(long valPtr, int valSize) throws IgniteCheckedException; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimapBase.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimapBase.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimapBase.java new file mode 100644 index 0000000..7f332aa --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimapBase.java @@ -0,0 +1,368 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.shuffle.collections; + +import org.apache.ignite.*; +import org.apache.ignite.internal.processors.hadoop.*; +import org.apache.ignite.internal.processors.hadoop.shuffle.streams.*; +import org.apache.ignite.internal.util.*; +import org.apache.ignite.internal.util.offheap.unsafe.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.util.*; +import java.util.concurrent.*; + +import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.*; + +/** + * Base class for all multimaps. + */ +public abstract class HadoopMultimapBase implements HadoopMultimap { + /** */ + protected final GridUnsafeMemory mem; + + /** */ + protected final int pageSize; + + /** */ + private final Collection<GridLongList> allPages = new ConcurrentLinkedQueue<>(); + + /** + * @param jobInfo Job info. + * @param mem Memory. + */ + protected HadoopMultimapBase(HadoopJobInfo jobInfo, GridUnsafeMemory mem) { + assert jobInfo != null; + assert mem != null; + + this.mem = mem; + + pageSize = get(jobInfo, SHUFFLE_OFFHEAP_PAGE_SIZE, 32 * 1024); + } + + /** + * @param ptrs Page pointers. + */ + private void deallocate(GridLongList ptrs) { + while (!ptrs.isEmpty()) + mem.release(ptrs.remove(), ptrs.remove()); + } + + /** + * @param valPtr Value page pointer. + * @param nextValPtr Next value page pointer. + */ + protected void nextValue(long valPtr, long nextValPtr) { + mem.writeLong(valPtr, nextValPtr); + } + + /** + * @param valPtr Value page pointer. + * @return Next value page pointer. + */ + protected long nextValue(long valPtr) { + return mem.readLong(valPtr); + } + + /** + * @param valPtr Value page pointer. + * @param size Size. + */ + protected void valueSize(long valPtr, int size) { + mem.writeInt(valPtr + 8, size); + } + + /** + * @param valPtr Value page pointer. + * @return Value size. + */ + protected int valueSize(long valPtr) { + return mem.readInt(valPtr + 8); + } + + /** {@inheritDoc} */ + @Override public void close() { + for (GridLongList list : allPages) + deallocate(list); + } + + /** + * Reader for key and value. + */ + protected class ReaderBase implements AutoCloseable { + /** */ + private Object tmp; + + /** */ + private final HadoopSerialization ser; + + /** */ + private final HadoopDataInStream in = new HadoopDataInStream(mem); + + /** + * @param ser Serialization. + */ + protected ReaderBase(HadoopSerialization ser) { + assert ser != null; + + this.ser = ser; + } + + /** + * @param valPtr Value page pointer. + * @return Value. + */ + public Object readValue(long valPtr) { + assert valPtr > 0 : valPtr; + + try { + return read(valPtr + 12, valueSize(valPtr)); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + + /** + * Resets temporary object to the given one. + * + * @param tmp Temporary object for reuse. + */ + public void resetReusedObject(Object tmp) { + this.tmp = tmp; + } + + /** + * @param ptr Pointer. + * @param size Object size. + * @return Object. + */ + protected Object read(long ptr, long size) throws IgniteCheckedException { + in.buffer().set(ptr, size); + + tmp = ser.read(in, tmp); + + return tmp; + } + + /** {@inheritDoc} */ + @Override public void close() throws IgniteCheckedException { + ser.close(); + } + } + + /** + * Base class for adders. + */ + protected abstract class AdderBase implements Adder { + /** */ + protected final HadoopSerialization keySer; + + /** */ + protected final HadoopSerialization valSer; + + /** */ + private final HadoopDataOutStream out; + + /** */ + private long writeStart; + + /** Size and pointer pairs list. */ + private final GridLongList pages = new GridLongList(16); + + /** + * @param ctx Task context. + * @throws IgniteCheckedException If failed. + */ + protected AdderBase(HadoopTaskContext ctx) throws IgniteCheckedException { + valSer = ctx.valueSerialization(); + keySer = ctx.keySerialization(); + + out = new HadoopDataOutStream(mem) { + @Override public long move(long size) { + long ptr = super.move(size); + + if (ptr == 0) // Was not able to move - not enough free space. + ptr = allocateNextPage(size); + + assert ptr != 0; + + return ptr; + } + }; + } + + /** + * @param requestedSize Requested size. + * @return Next write pointer. + */ + private long allocateNextPage(long requestedSize) { + int writtenSize = writtenSize(); + + long newPageSize = Math.max(writtenSize + requestedSize, pageSize); + long newPagePtr = mem.allocate(newPageSize); + + pages.add(newPageSize); + pages.add(newPagePtr); + + HadoopOffheapBuffer b = out.buffer(); + + b.set(newPagePtr, newPageSize); + + if (writtenSize != 0) { + mem.copyMemory(writeStart, newPagePtr, writtenSize); + + b.move(writtenSize); + } + + writeStart = newPagePtr; + + return b.move(requestedSize); + } + + /** + * @return Fixed pointer. + */ + private long fixAlignment() { + HadoopOffheapBuffer b = out.buffer(); + + long ptr = b.pointer(); + + if ((ptr & 7L) != 0) { // Address is not aligned by octet. + ptr = (ptr + 8L) & ~7L; + + b.pointer(ptr); + } + + return ptr; + } + + /** + * @param off Offset. + * @param o Object. + * @return Page pointer. + * @throws IgniteCheckedException If failed. + */ + protected long write(int off, Object o, HadoopSerialization ser) throws IgniteCheckedException { + writeStart = fixAlignment(); + + if (off != 0) + out.move(off); + + ser.write(out, o); + + return writeStart; + } + + /** + * @param size Size. + * @return Pointer. + */ + protected long allocate(int size) { + writeStart = fixAlignment(); + + out.move(size); + + return writeStart; + } + + /** + * Rewinds local allocation pointer to the given pointer if possible. + * + * @param ptr Pointer. + */ + protected void localDeallocate(long ptr) { + HadoopOffheapBuffer b = out.buffer(); + + if (b.isInside(ptr)) + b.pointer(ptr); + else + b.reset(); + } + + /** + * @return Written size. + */ + protected int writtenSize() { + return (int)(out.buffer().pointer() - writeStart); + } + + /** {@inheritDoc} */ + @Override public Key addKey(DataInput in, @Nullable Key reuse) throws IgniteCheckedException { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public void close() throws IgniteCheckedException { + allPages.add(pages); + + keySer.close(); + valSer.close(); + } + } + + /** + * Iterator over values. + */ + protected class ValueIterator implements Iterator<Object> { + /** */ + private long valPtr; + + /** */ + private final ReaderBase valReader; + + /** + * @param valPtr Value page pointer. + * @param valReader Value reader. + */ + protected ValueIterator(long valPtr, ReaderBase valReader) { + this.valPtr = valPtr; + this.valReader = valReader; + } + + /** + * @param valPtr Head value pointer. + */ + public void head(long valPtr) { + this.valPtr = valPtr; + } + + /** {@inheritDoc} */ + @Override public boolean hasNext() { + return valPtr != 0; + } + + /** {@inheritDoc} */ + @Override public Object next() { + if (!hasNext()) + throw new NoSuchElementException(); + + Object res = valReader.readValue(valPtr); + + valPtr = nextValue(valPtr); + + return res; + } + + /** {@inheritDoc} */ + @Override public void remove() { + throw new UnsupportedOperationException(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipList.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipList.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipList.java new file mode 100644 index 0000000..69aa7a7 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipList.java @@ -0,0 +1,726 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.shuffle.collections; + +import org.apache.ignite.*; +import org.apache.ignite.internal.processors.hadoop.*; +import org.apache.ignite.internal.util.*; +import org.apache.ignite.internal.util.offheap.unsafe.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.util.*; +import java.util.concurrent.atomic.*; + +/** + * Skip list. + */ +public class HadoopSkipList extends HadoopMultimapBase { + /** */ + private static final int HEADS_SIZE = 24 + 33 * 8; // Offset + max level is from 0 to 32 inclusive. + + /** Top level. */ + private final AtomicInteger topLevel = new AtomicInteger(-1); + + /** Heads for all the lists. */ + private final long heads; + + /** */ + private final AtomicBoolean visitGuard = new AtomicBoolean(); + + /** + * @param jobInfo Job info. + * @param mem Memory. + */ + public HadoopSkipList(HadoopJobInfo jobInfo, GridUnsafeMemory mem) { + super(jobInfo, mem); + + heads = mem.allocate(HEADS_SIZE, true); + } + + /** {@inheritDoc} */ + @Override public void close() { + super.close(); + + mem.release(heads, HEADS_SIZE); + } + + /** {@inheritDoc} */ + @Override public boolean visit(boolean ignoreLastVisited, Visitor v) throws IgniteCheckedException { + if (!visitGuard.compareAndSet(false, true)) + return false; + + for (long meta = nextMeta(heads, 0); meta != 0L; meta = nextMeta(meta, 0)) { + long valPtr = value(meta); + + long lastVisited = ignoreLastVisited ? 0 : lastVisitedValue(meta); + + if (valPtr != lastVisited) { + long k = key(meta); + + v.onKey(k + 4, keySize(k)); + + lastVisitedValue(meta, valPtr); // Set it to the first value in chain. + + do { + v.onValue(valPtr + 12, valueSize(valPtr)); + + valPtr = nextValue(valPtr); + } + while (valPtr != lastVisited); + } + } + + visitGuard.lazySet(false); + + return true; + } + + /** {@inheritDoc} */ + @Override public Adder startAdding(HadoopTaskContext ctx) throws IgniteCheckedException { + return new AdderImpl(ctx); + } + + /** {@inheritDoc} */ + @Override public HadoopTaskInput input(HadoopTaskContext taskCtx) throws IgniteCheckedException { + Input in = new Input(taskCtx); + + Comparator<Object> grpCmp = taskCtx.groupComparator(); + + if (grpCmp != null) + return new GroupedInput(grpCmp, in); + + return in; + } + + /** + * @param meta Meta pointer. + * @return Key pointer. + */ + private long key(long meta) { + return mem.readLong(meta); + } + + /** + * @param meta Meta pointer. + * @param key Key pointer. + */ + private void key(long meta, long key) { + mem.writeLong(meta, key); + } + + /** + * @param meta Meta pointer. + * @return Value pointer. + */ + private long value(long meta) { + return mem.readLongVolatile(meta + 8); + } + + /** + * @param meta Meta pointer. + * @param valPtr Value pointer. + */ + private void value(long meta, long valPtr) { + mem.writeLongVolatile(meta + 8, valPtr); + } + + /** + * @param meta Meta pointer. + * @param oldValPtr Old first value pointer. + * @param newValPtr New first value pointer. + * @return {@code true} If operation succeeded. + */ + private boolean casValue(long meta, long oldValPtr, long newValPtr) { + return mem.casLong(meta + 8, oldValPtr, newValPtr); + } + + /** + * @param meta Meta pointer. + * @return Last visited value pointer. + */ + private long lastVisitedValue(long meta) { + return mem.readLong(meta + 16); + } + + /** + * @param meta Meta pointer. + * @param valPtr Last visited value pointer. + */ + private void lastVisitedValue(long meta, long valPtr) { + mem.writeLong(meta + 16, valPtr); + } + + /** + * @param meta Meta pointer. + * @param level Level. + * @return Next meta pointer. + */ + private long nextMeta(long meta, int level) { + assert meta > 0 : meta; + + return mem.readLongVolatile(meta + 24 + 8 * level); + } + + /** + * @param meta Meta pointer. + * @param level Level. + * @param oldNext Old next meta pointer. + * @param newNext New next meta pointer. + * @return {@code true} If operation succeeded. + */ + private boolean casNextMeta(long meta, int level, long oldNext, long newNext) { + assert meta > 0 : meta; + + return mem.casLong(meta + 24 + 8 * level, oldNext, newNext); + } + + /** + * @param meta Meta pointer. + * @param level Level. + * @param nextMeta Next meta. + */ + private void nextMeta(long meta, int level, long nextMeta) { + assert meta != 0; + + mem.writeLong(meta + 24 + 8 * level, nextMeta); + } + + /** + * @param keyPtr Key pointer. + * @return Key size. + */ + private int keySize(long keyPtr) { + return mem.readInt(keyPtr); + } + + /** + * @param keyPtr Key pointer. + * @param keySize Key size. + */ + private void keySize(long keyPtr, int keySize) { + mem.writeInt(keyPtr, keySize); + } + + /** + * @param rnd Random. + * @return Next level. + */ + public static int randomLevel(Random rnd) { + int x = rnd.nextInt(); + + int level = 0; + + while ((x & 1) != 0) { // Count sequential 1 bits. + level++; + + x >>>= 1; + } + + return level; + } + + /** + * Reader. + */ + private class Reader extends ReaderBase { + /** + * @param ser Serialization. + */ + protected Reader(HadoopSerialization ser) { + super(ser); + } + + /** + * @param meta Meta pointer. + * @return Key. + */ + public Object readKey(long meta) { + assert meta > 0 : meta; + + long k = key(meta); + + try { + return read(k + 4, keySize(k)); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + } + + /** + * Adder. + */ + private class AdderImpl extends AdderBase { + /** */ + private final Comparator<Object> cmp; + + /** */ + private final Random rnd = new GridRandom(); + + /** */ + private final GridLongList stack = new GridLongList(16); + + /** */ + private final Reader keyReader; + + /** + * @param ctx Task context. + * @throws IgniteCheckedException If failed. + */ + protected AdderImpl(HadoopTaskContext ctx) throws IgniteCheckedException { + super(ctx); + + keyReader = new Reader(keySer); + + cmp = ctx.sortComparator(); + } + + /** {@inheritDoc} */ + @Override public void write(Object key, Object val) throws IgniteCheckedException { + A.notNull(val, "val"); + + add(key, val); + } + + /** {@inheritDoc} */ + @Override public Key addKey(DataInput in, @Nullable Key reuse) throws IgniteCheckedException { + KeyImpl k = reuse == null ? new KeyImpl() : (KeyImpl)reuse; + + k.tmpKey = keySer.read(in, k.tmpKey); + + k.meta = add(k.tmpKey, null); + + return k; + } + + /** + * @param key Key. + * @param val Value. + * @param level Level. + * @return Meta pointer. + */ + private long createMeta(long key, long val, int level) { + int size = 32 + 8 * level; + + long meta = allocate(size); + + key(meta, key); + value(meta, val); + lastVisitedValue(meta, 0L); + + for (int i = 32; i < size; i += 8) // Fill with 0. + mem.writeLong(meta + i, 0L); + + return meta; + } + + /** + * @param key Key. + * @return Pointer. + * @throws IgniteCheckedException If failed. + */ + private long writeKey(Object key) throws IgniteCheckedException { + long keyPtr = write(4, key, keySer); + int keySize = writtenSize() - 4; + + keySize(keyPtr, keySize); + + return keyPtr; + } + + /** + * @param prevMeta Previous meta. + * @param meta Next meta. + */ + private void stackPush(long prevMeta, long meta) { + stack.add(prevMeta); + stack.add(meta); + } + + /** + * Drops last remembered frame from the stack. + */ + private void stackPop() { + stack.pop(2); + } + + /** + * @param key Key. + * @param val Value. + * @return Meta pointer. + * @throws IgniteCheckedException If failed. + */ + private long add(Object key, @Nullable Object val) throws IgniteCheckedException { + assert key != null; + + stack.clear(); + + long valPtr = 0; + + if (val != null) { // Write value. + valPtr = write(12, val, valSer); + int valSize = writtenSize() - 12; + + nextValue(valPtr, 0); + valueSize(valPtr, valSize); + } + + long keyPtr = 0; + long newMeta = 0; + int newMetaLevel = -1; + + long prevMeta = heads; + int level = topLevel.get(); + long meta = level < 0 ? 0 : nextMeta(heads, level); + + for (;;) { + if (level < 0) { // We did not find our key, trying to add new meta. + if (keyPtr == 0) { // Write key and create meta only once. + keyPtr = writeKey(key); + + newMetaLevel = randomLevel(rnd); + newMeta = createMeta(keyPtr, valPtr, newMetaLevel); + } + + nextMeta(newMeta, 0, meta); // Set next to new meta before publishing. + + if (casNextMeta(prevMeta, 0, meta, newMeta)) { // New key was added successfully. + laceUp(key, newMeta, newMetaLevel); + + return newMeta; + } + else { // Add failed, need to check out what was added by another thread. + meta = nextMeta(prevMeta, level = 0); + + stackPop(); + } + } + + int cmpRes = cmp(key, meta); + + if (cmpRes == 0) { // Key found. + if (newMeta != 0) // Deallocate if we've allocated something. + localDeallocate(keyPtr); + + if (valPtr == 0) // Only key needs to be added. + return meta; + + for (;;) { // Add value for the key found. + long nextVal = value(meta); + + nextValue(valPtr, nextVal); + + if (casValue(meta, nextVal, valPtr)) + return meta; + } + } + + assert cmpRes != 0; + + if (cmpRes > 0) { // Go right. + prevMeta = meta; + meta = nextMeta(meta, level); + + if (meta != 0) // If nothing to the right then go down. + continue; + } + + while (--level >= 0) { // Go down. + stackPush(prevMeta, meta); // Remember the path. + + long nextMeta = nextMeta(prevMeta, level); + + if (nextMeta != meta) { // If the meta is the same as on upper level go deeper. + meta = nextMeta; + + assert meta != 0; + + break; + } + } + } + } + + /** + * @param key Key. + * @param meta Meta pointer. + * @return Comparison result. + */ + @SuppressWarnings("unchecked") + private int cmp(Object key, long meta) { + assert meta != 0; + + return cmp.compare(key, keyReader.readKey(meta)); + } + + /** + * Adds appropriate index links between metas. + * + * @param newMeta Just added meta. + * @param newMetaLevel New level. + */ + private void laceUp(Object key, long newMeta, int newMetaLevel) { + for (int level = 1; level <= newMetaLevel; level++) { // Go from the bottom up. + long prevMeta = heads; + long meta = 0; + + if (!stack.isEmpty()) { // Get the path back. + meta = stack.remove(); + prevMeta = stack.remove(); + } + + for (;;) { + nextMeta(newMeta, level, meta); + + if (casNextMeta(prevMeta, level, meta, newMeta)) + break; + + long oldMeta = meta; + + meta = nextMeta(prevMeta, level); // Reread meta. + + for (;;) { + int cmpRes = cmp(key, meta); + + if (cmpRes > 0) { // Go right. + prevMeta = meta; + meta = nextMeta(prevMeta, level); + + if (meta != oldMeta) // Old meta already known to be greater than ours or is 0. + continue; + } + + assert cmpRes != 0; // Two different metas with equal keys must be impossible. + + break; // Retry cas. + } + } + } + + if (!stack.isEmpty()) + return; // Our level already lower than top. + + for (;;) { // Raise top level. + int top = topLevel.get(); + + if (newMetaLevel <= top || topLevel.compareAndSet(top, newMetaLevel)) + break; + } + } + + /** + * Key. + */ + private class KeyImpl implements Key { + /** */ + private long meta; + + /** */ + private Object tmpKey; + + /** + * @return Meta pointer for the key. + */ + public long address() { + return meta; + } + + /** + * @param val Value. + */ + @Override public void add(Value val) { + int size = val.size(); + + long valPtr = allocate(size + 12); + + val.copyTo(valPtr + 12); + + valueSize(valPtr, size); + + long nextVal; + + do { + nextVal = value(meta); + + nextValue(valPtr, nextVal); + } + while(!casValue(meta, nextVal, valPtr)); + } + } + } + + /** + * Task input. + */ + private class Input implements HadoopTaskInput { + /** */ + private long metaPtr = heads; + + /** */ + private final Reader keyReader; + + /** */ + private final Reader valReader; + + /** + * @param taskCtx Task context. + * @throws IgniteCheckedException If failed. + */ + private Input(HadoopTaskContext taskCtx) throws IgniteCheckedException { + keyReader = new Reader(taskCtx.keySerialization()); + valReader = new Reader(taskCtx.valueSerialization()); + } + + /** {@inheritDoc} */ + @Override public boolean next() { + metaPtr = nextMeta(metaPtr, 0); + + return metaPtr != 0; + } + + /** {@inheritDoc} */ + @Override public Object key() { + return keyReader.readKey(metaPtr); + } + + /** {@inheritDoc} */ + @Override public Iterator<?> values() { + return new ValueIterator(value(metaPtr), valReader); + } + + /** {@inheritDoc} */ + @Override public void close() throws IgniteCheckedException { + keyReader.close(); + valReader.close(); + } + } + + /** + * Grouped input using grouping comparator. + */ + private class GroupedInput implements HadoopTaskInput { + /** */ + private final Comparator<Object> grpCmp; + + /** */ + private final Input in; + + /** */ + private Object prevKey; + + /** */ + private Object nextKey; + + /** */ + private final GridLongList vals = new GridLongList(); + + /** + * @param grpCmp Grouping comparator. + * @param in Input. + */ + private GroupedInput(Comparator<Object> grpCmp, Input in) { + this.grpCmp = grpCmp; + this.in = in; + } + + /** {@inheritDoc} */ + @Override public boolean next() { + if (prevKey == null) { // First call. + if (!in.next()) + return false; + + prevKey = in.key(); + + assert prevKey != null; + + in.keyReader.resetReusedObject(null); // We need 2 instances of key object for comparison. + + vals.add(value(in.metaPtr)); + } + else { + if (in.metaPtr == 0) // We reached the end of the input. + return false; + + vals.clear(); + + vals.add(value(in.metaPtr)); + + in.keyReader.resetReusedObject(prevKey); // Switch key instances. + + prevKey = nextKey; + } + + while (in.next()) { // Fill with head value pointers with equal keys. + if (grpCmp.compare(prevKey, nextKey = in.key()) == 0) + vals.add(value(in.metaPtr)); + else + break; + } + + assert !vals.isEmpty(); + + return true; + } + + /** {@inheritDoc} */ + @Override public Object key() { + return prevKey; + } + + /** {@inheritDoc} */ + @Override public Iterator<?> values() { + assert !vals.isEmpty(); + + final ValueIterator valIter = new ValueIterator(vals.get(0), in.valReader); + + return new Iterator<Object>() { + /** */ + private int idx; + + @Override public boolean hasNext() { + if (!valIter.hasNext()) { + if (++idx == vals.size()) + return false; + + valIter.head(vals.get(idx)); + + assert valIter.hasNext(); + } + + return true; + } + + @Override public Object next() { + return valIter.next(); + } + + @Override public void remove() { + valIter.remove(); + } + }; + } + + /** {@inheritDoc} */ + @Override public void close() throws IgniteCheckedException { + in.close(); + } + } +}