http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/GridHadoopShuffleJob.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/GridHadoopShuffleJob.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/GridHadoopShuffleJob.java deleted file mode 100644 index 545c1b8..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/GridHadoopShuffleJob.java +++ /dev/null @@ -1,593 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.shuffle; - -import org.apache.ignite.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.processors.hadoop.*; -import org.apache.ignite.internal.processors.hadoop.counter.*; -import org.apache.ignite.internal.processors.hadoop.shuffle.collections.*; -import org.apache.ignite.internal.util.future.*; -import org.apache.ignite.internal.util.io.*; -import org.apache.ignite.internal.util.lang.*; -import org.apache.ignite.internal.util.offheap.unsafe.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.internal.util.worker.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.thread.*; - -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.*; - -import static org.apache.ignite.internal.processors.hadoop.GridHadoopJobProperty.*; -import static org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory.*; - -/** - * Shuffle job. - */ -public class GridHadoopShuffleJob<T> implements AutoCloseable { - /** */ - private static final int MSG_BUF_SIZE = 128 * 1024; - - /** */ - private final GridHadoopJob job; - - /** */ - private final GridUnsafeMemory mem; - - /** */ - private final boolean needPartitioner; - - /** Collection of task contexts for each reduce task. */ - private final Map<Integer, GridHadoopTaskContext> reducersCtx = new HashMap<>(); - - /** Reducers addresses. */ - private T[] reduceAddrs; - - /** Local reducers address. */ - private final T locReduceAddr; - - /** */ - private final GridHadoopShuffleMessage[] msgs; - - /** */ - private final AtomicReferenceArray<GridHadoopMultimap> maps; - - /** */ - private volatile IgniteInClosure2X<T, GridHadoopShuffleMessage> io; - - /** */ - protected ConcurrentMap<Long, IgniteBiTuple<GridHadoopShuffleMessage, GridFutureAdapterEx<?>>> sentMsgs = - new ConcurrentHashMap<>(); - - /** */ - private volatile GridWorker snd; - - /** Latch for remote addresses waiting. */ - private final CountDownLatch ioInitLatch = new CountDownLatch(1); - - /** Finished flag. Set on flush or close. */ - private volatile boolean flushed; - - /** */ - private final IgniteLogger log; - - /** - * @param locReduceAddr Local reducer address. - * @param log Logger. - * @param job Job. - * @param mem Memory. - * @param totalReducerCnt Amount of reducers in the Job. - * @param locReducers Reducers will work on current node. - * @throws IgniteCheckedException If error. - */ - public GridHadoopShuffleJob(T locReduceAddr, IgniteLogger log, GridHadoopJob job, GridUnsafeMemory mem, - int totalReducerCnt, int[] locReducers) throws IgniteCheckedException { - this.locReduceAddr = locReduceAddr; - this.job = job; - this.mem = mem; - this.log = log.getLogger(GridHadoopShuffleJob.class); - - if (!F.isEmpty(locReducers)) { - for (int rdc : locReducers) { - GridHadoopTaskInfo taskInfo = new GridHadoopTaskInfo(GridHadoopTaskType.REDUCE, job.id(), rdc, 0, null); - - reducersCtx.put(rdc, job.getTaskContext(taskInfo)); - } - } - - needPartitioner = totalReducerCnt > 1; - - maps = new AtomicReferenceArray<>(totalReducerCnt); - msgs = new GridHadoopShuffleMessage[totalReducerCnt]; - } - - /** - * @param reduceAddrs Addresses of reducers. - * @return {@code True} if addresses were initialized by this call. - */ - public boolean initializeReduceAddresses(T[] reduceAddrs) { - if (this.reduceAddrs == null) { - this.reduceAddrs = reduceAddrs; - - return true; - } - - return false; - } - - /** - * @return {@code True} if reducers addresses were initialized. - */ - public boolean reducersInitialized() { - return reduceAddrs != null; - } - - /** - * @param gridName Grid name. - * @param io IO Closure for sending messages. - */ - @SuppressWarnings("BusyWait") - public void startSending(String gridName, IgniteInClosure2X<T, GridHadoopShuffleMessage> io) { - assert snd == null; - assert io != null; - - this.io = io; - - if (!flushed) { - snd = new GridWorker(gridName, "hadoop-shuffle-" + job.id(), log) { - @Override protected void body() throws InterruptedException { - try { - while (!isCancelled()) { - Thread.sleep(5); - - collectUpdatesAndSend(false); - } - } - catch (IgniteCheckedException e) { - throw new IllegalStateException(e); - } - } - }; - - new IgniteThread(snd).start(); - } - - ioInitLatch.countDown(); - } - - /** - * @param maps Maps. - * @param idx Index. - * @return Map. - */ - private GridHadoopMultimap getOrCreateMap(AtomicReferenceArray<GridHadoopMultimap> maps, int idx) { - GridHadoopMultimap map = maps.get(idx); - - if (map == null) { // Create new map. - map = get(job.info(), SHUFFLE_REDUCER_NO_SORTING, false) ? - new GridHadoopConcurrentHashMultimap(job.info(), mem, get(job.info(), PARTITION_HASHMAP_SIZE, 8 * 1024)): - new GridHadoopSkipList(job.info(), mem); - - if (!maps.compareAndSet(idx, null, map)) { - map.close(); - - return maps.get(idx); - } - } - - return map; - } - - /** - * @param msg Message. - * @throws IgniteCheckedException Exception. - */ - public void onShuffleMessage(GridHadoopShuffleMessage msg) throws IgniteCheckedException { - assert msg.buffer() != null; - assert msg.offset() > 0; - - GridHadoopTaskContext taskCtx = reducersCtx.get(msg.reducer()); - - GridHadoopPerformanceCounter perfCntr = GridHadoopPerformanceCounter.getCounter(taskCtx.counters(), null); - - perfCntr.onShuffleMessage(msg.reducer(), U.currentTimeMillis()); - - GridHadoopMultimap map = getOrCreateMap(maps, msg.reducer()); - - // Add data from message to the map. - try (GridHadoopMultimap.Adder adder = map.startAdding(taskCtx)) { - final GridUnsafeDataInput dataInput = new GridUnsafeDataInput(); - final UnsafeValue val = new UnsafeValue(msg.buffer()); - - msg.visit(new GridHadoopShuffleMessage.Visitor() { - /** */ - private GridHadoopMultimap.Key key; - - @Override public void onKey(byte[] buf, int off, int len) throws IgniteCheckedException { - dataInput.bytes(buf, off, off + len); - - key = adder.addKey(dataInput, key); - } - - @Override public void onValue(byte[] buf, int off, int len) { - val.off = off; - val.size = len; - - key.add(val); - } - }); - } - } - - /** - * @param ack Shuffle ack. - */ - @SuppressWarnings("ConstantConditions") - public void onShuffleAck(GridHadoopShuffleAck ack) { - IgniteBiTuple<GridHadoopShuffleMessage, GridFutureAdapterEx<?>> tup = sentMsgs.get(ack.id()); - - if (tup != null) - tup.get2().onDone(); - else - log.warning("Received shuffle ack for not registered shuffle id: " + ack); - } - - /** - * Unsafe value. - */ - private static class UnsafeValue implements GridHadoopMultimap.Value { - /** */ - private final byte[] buf; - - /** */ - private int off; - - /** */ - private int size; - - /** - * @param buf Buffer. - */ - private UnsafeValue(byte[] buf) { - assert buf != null; - - this.buf = buf; - } - - /** */ - @Override public int size() { - return size; - } - - /** */ - @Override public void copyTo(long ptr) { - UNSAFE.copyMemory(buf, BYTE_ARR_OFF + off, null, ptr, size); - } - } - - /** - * Sends map updates to remote reducers. - */ - private void collectUpdatesAndSend(boolean flush) throws IgniteCheckedException { - for (int i = 0; i < maps.length(); i++) { - GridHadoopMultimap map = maps.get(i); - - if (map == null || locReduceAddr.equals(reduceAddrs[i])) - continue; // Skip empty map and local node. - - if (msgs[i] == null) - msgs[i] = new GridHadoopShuffleMessage(job.id(), i, MSG_BUF_SIZE); - - final int idx = i; - - map.visit(false, new GridHadoopMultimap.Visitor() { - /** */ - private long keyPtr; - - /** */ - private int keySize; - - /** */ - private boolean keyAdded; - - /** {@inheritDoc} */ - @Override public void onKey(long keyPtr, int keySize) { - this.keyPtr = keyPtr; - this.keySize = keySize; - - keyAdded = false; - } - - private boolean tryAdd(long valPtr, int valSize) { - GridHadoopShuffleMessage msg = msgs[idx]; - - if (!keyAdded) { // Add key and value. - int size = keySize + valSize; - - if (!msg.available(size, false)) - return false; - - msg.addKey(keyPtr, keySize); - msg.addValue(valPtr, valSize); - - keyAdded = true; - - return true; - } - - if (!msg.available(valSize, true)) - return false; - - msg.addValue(valPtr, valSize); - - return true; - } - - /** {@inheritDoc} */ - @Override public void onValue(long valPtr, int valSize) { - if (tryAdd(valPtr, valSize)) - return; - - send(idx, keySize + valSize); - - keyAdded = false; - - if (!tryAdd(valPtr, valSize)) - throw new IllegalStateException(); - } - }); - - if (flush && msgs[i].offset() != 0) - send(i, 0); - } - } - - /** - * @param idx Index of message. - * @param newBufMinSize Min new buffer size. - */ - private void send(final int idx, int newBufMinSize) { - final GridFutureAdapterEx<?> fut = new GridFutureAdapterEx<>(); - - GridHadoopShuffleMessage msg = msgs[idx]; - - final long msgId = msg.id(); - - IgniteBiTuple<GridHadoopShuffleMessage, GridFutureAdapterEx<?>> old = sentMsgs.putIfAbsent(msgId, - new IgniteBiTuple<GridHadoopShuffleMessage, GridFutureAdapterEx<?>>(msg, fut)); - - assert old == null; - - try { - io.apply(reduceAddrs[idx], msg); - } - catch (GridClosureException e) { - fut.onDone(U.unwrap(e)); - } - - fut.listenAsync(new IgniteInClosure<IgniteInternalFuture<?>>() { - @Override public void apply(IgniteInternalFuture<?> f) { - try { - f.get(); - - // Clean up the future from map only if there was no exception. - // Otherwise flush() should fail. - sentMsgs.remove(msgId); - } - catch (IgniteCheckedException e) { - log.error("Failed to send message.", e); - } - } - }); - - msgs[idx] = newBufMinSize == 0 ? null : new GridHadoopShuffleMessage(job.id(), idx, - Math.max(MSG_BUF_SIZE, newBufMinSize)); - } - - /** {@inheritDoc} */ - @Override public void close() throws IgniteCheckedException { - if (snd != null) { - snd.cancel(); - - try { - snd.join(); - } - catch (InterruptedException e) { - throw new IgniteInterruptedCheckedException(e); - } - } - - close(maps); - } - - /** - * @param maps Maps. - */ - private void close(AtomicReferenceArray<GridHadoopMultimap> maps) { - for (int i = 0; i < maps.length(); i++) { - GridHadoopMultimap map = maps.get(i); - - if (map != null) - map.close(); - } - } - - /** - * @return Future. - */ - @SuppressWarnings("unchecked") - public IgniteInternalFuture<?> flush() throws IgniteCheckedException { - if (log.isDebugEnabled()) - log.debug("Flushing job " + job.id() + " on address " + locReduceAddr); - - flushed = true; - - if (maps.length() == 0) - return new GridFinishedFutureEx<>(); - - U.await(ioInitLatch); - - GridWorker snd0 = snd; - - if (snd0 != null) { - if (log.isDebugEnabled()) - log.debug("Cancelling sender thread."); - - snd0.cancel(); - - try { - snd0.join(); - - if (log.isDebugEnabled()) - log.debug("Finished waiting for sending thread to complete on shuffle job flush: " + job.id()); - } - catch (InterruptedException e) { - throw new IgniteInterruptedCheckedException(e); - } - } - - collectUpdatesAndSend(true); // With flush. - - if (log.isDebugEnabled()) - log.debug("Finished sending collected updates to remote reducers: " + job.id()); - - GridCompoundFuture fut = new GridCompoundFuture<>(); - - for (IgniteBiTuple<GridHadoopShuffleMessage, GridFutureAdapterEx<?>> tup : sentMsgs.values()) - fut.add(tup.get2()); - - fut.markInitialized(); - - if (log.isDebugEnabled()) - log.debug("Collected futures to compound futures for flush: " + sentMsgs.size()); - - return fut; - } - - /** - * @param taskCtx Task context. - * @return Output. - * @throws IgniteCheckedException If failed. - */ - public GridHadoopTaskOutput output(GridHadoopTaskContext taskCtx) throws IgniteCheckedException { - switch (taskCtx.taskInfo().type()) { - case MAP: - assert !job.info().hasCombiner() : "The output creation is allowed if combiner has not been defined."; - - case COMBINE: - return new PartitionedOutput(taskCtx); - - default: - throw new IllegalStateException("Illegal type: " + taskCtx.taskInfo().type()); - } - } - - /** - * @param taskCtx Task context. - * @return Input. - * @throws IgniteCheckedException If failed. - */ - @SuppressWarnings("unchecked") - public GridHadoopTaskInput input(GridHadoopTaskContext taskCtx) throws IgniteCheckedException { - switch (taskCtx.taskInfo().type()) { - case REDUCE: - int reducer = taskCtx.taskInfo().taskNumber(); - - GridHadoopMultimap m = maps.get(reducer); - - if (m != null) - return m.input(taskCtx); - - return new GridHadoopTaskInput() { // Empty input. - @Override public boolean next() { - return false; - } - - @Override public Object key() { - throw new IllegalStateException(); - } - - @Override public Iterator<?> values() { - throw new IllegalStateException(); - } - - @Override public void close() { - // No-op. - } - }; - - default: - throw new IllegalStateException("Illegal type: " + taskCtx.taskInfo().type()); - } - } - - /** - * Partitioned output. - */ - private class PartitionedOutput implements GridHadoopTaskOutput { - /** */ - private final GridHadoopTaskOutput[] adders = new GridHadoopTaskOutput[maps.length()]; - - /** */ - private GridHadoopPartitioner partitioner; - - /** */ - private final GridHadoopTaskContext taskCtx; - - /** - * Constructor. - * @param taskCtx Task context. - */ - private PartitionedOutput(GridHadoopTaskContext taskCtx) throws IgniteCheckedException { - this.taskCtx = taskCtx; - - if (needPartitioner) - partitioner = taskCtx.partitioner(); - } - - /** {@inheritDoc} */ - @Override public void write(Object key, Object val) throws IgniteCheckedException { - int part = 0; - - if (partitioner != null) { - part = partitioner.partition(key, val, adders.length); - - if (part < 0 || part >= adders.length) - throw new IgniteCheckedException("Invalid partition: " + part); - } - - GridHadoopTaskOutput out = adders[part]; - - if (out == null) - adders[part] = out = getOrCreateMap(maps, part).startAdding(taskCtx); - - out.write(key, val); - } - - /** {@inheritDoc} */ - @Override public void close() throws IgniteCheckedException { - for (GridHadoopTaskOutput adder : adders) { - if (adder != null) - adder.close(); - } - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/GridHadoopShuffleMessage.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/GridHadoopShuffleMessage.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/GridHadoopShuffleMessage.java deleted file mode 100644 index 24ebc0c..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/GridHadoopShuffleMessage.java +++ /dev/null @@ -1,242 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.shuffle; - -import org.apache.ignite.*; -import org.apache.ignite.internal.processors.hadoop.*; -import org.apache.ignite.internal.processors.hadoop.message.*; -import org.apache.ignite.internal.util.*; -import org.apache.ignite.internal.util.tostring.*; -import org.apache.ignite.internal.util.typedef.internal.*; - -import java.io.*; -import java.util.concurrent.atomic.*; - -import static org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory.*; - -/** - * Shuffle message. - */ -public class GridHadoopShuffleMessage implements GridHadoopMessage { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - private static final AtomicLong ids = new AtomicLong(); - - /** */ - private static final byte MARKER_KEY = (byte)17; - - /** */ - private static final byte MARKER_VALUE = (byte)31; - - /** */ - @GridToStringInclude - private long msgId; - - /** */ - @GridToStringInclude - private GridHadoopJobId jobId; - - /** */ - @GridToStringInclude - private int reducer; - - /** */ - private byte[] buf; - - /** */ - @GridToStringInclude - private int off; - - /** - * - */ - public GridHadoopShuffleMessage() { - // No-op. - } - - /** - * @param size Size. - */ - public GridHadoopShuffleMessage(GridHadoopJobId jobId, int reducer, int size) { - assert jobId != null; - - buf = new byte[size]; - - this.jobId = jobId; - this.reducer = reducer; - - msgId = ids.incrementAndGet(); - } - - /** - * @return Message ID. - */ - public long id() { - return msgId; - } - - /** - * @return Job ID. - */ - public GridHadoopJobId jobId() { - return jobId; - } - - /** - * @return Reducer. - */ - public int reducer() { - return reducer; - } - - /** - * @return Buffer. - */ - public byte[] buffer() { - return buf; - } - - /** - * @return Offset. - */ - public int offset() { - return off; - } - - /** - * @param size Size. - * @param valOnly Only value wll be added. - * @return {@code true} If this message can fit additional data of this size - */ - public boolean available(int size, boolean valOnly) { - size += valOnly ? 5 : 10; - - if (off + size > buf.length) { - if (off == 0) { // Resize if requested size is too big. - buf = new byte[size]; - - return true; - } - - return false; - } - - return true; - } - - /** - * @param keyPtr Key pointer. - * @param keySize Key size. - */ - public void addKey(long keyPtr, int keySize) { - add(MARKER_KEY, keyPtr, keySize); - } - - /** - * @param valPtr Value pointer. - * @param valSize Value size. - */ - public void addValue(long valPtr, int valSize) { - add(MARKER_VALUE, valPtr, valSize); - } - - /** - * @param marker Marker. - * @param ptr Pointer. - * @param size Size. - */ - private void add(byte marker, long ptr, int size) { - buf[off++] = marker; - - UNSAFE.putInt(buf, BYTE_ARR_OFF + off, size); - - off += 4; - - UNSAFE.copyMemory(null, ptr, buf, BYTE_ARR_OFF + off, size); - - off += size; - } - - /** - * @param v Visitor. - */ - public void visit(Visitor v) throws IgniteCheckedException { - for (int i = 0; i < off;) { - byte marker = buf[i++]; - - int size = UNSAFE.getInt(buf, BYTE_ARR_OFF + i); - - i += 4; - - if (marker == MARKER_VALUE) - v.onValue(buf, i, size); - else if (marker == MARKER_KEY) - v.onKey(buf, i, size); - else - throw new IllegalStateException(); - - i += size; - } - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - jobId.writeExternal(out); - out.writeLong(msgId); - out.writeInt(reducer); - out.writeInt(off); - U.writeByteArray(out, buf); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - jobId = new GridHadoopJobId(); - - jobId.readExternal(in); - msgId = in.readLong(); - reducer = in.readInt(); - off = in.readInt(); - buf = U.readByteArray(in); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridHadoopShuffleMessage.class, this); - } - - /** - * Visitor. - */ - public static interface Visitor { - /** - * @param buf Buffer. - * @param off Offset. - * @param len Length. - */ - public void onKey(byte[] buf, int off, int len) throws IgniteCheckedException; - - /** - * @param buf Buffer. - * @param off Offset. - * @param len Length. - */ - public void onValue(byte[] buf, int off, int len) throws IgniteCheckedException; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java index 9880093..267316e 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java @@ -37,7 +37,7 @@ import java.util.concurrent.*; */ public class HadoopShuffle extends HadoopComponent { /** */ - private final ConcurrentMap<GridHadoopJobId, GridHadoopShuffleJob<UUID>> jobs = new ConcurrentHashMap<>(); + private final ConcurrentMap<GridHadoopJobId, HadoopShuffleJob<UUID>> jobs = new ConcurrentHashMap<>(); /** */ protected final GridUnsafeMemory mem = new GridUnsafeMemory(0); @@ -49,7 +49,7 @@ public class HadoopShuffle extends HadoopComponent { ctx.kernalContext().io().addUserMessageListener(GridTopic.TOPIC_HADOOP, new IgniteBiPredicate<UUID, Object>() { @Override public boolean apply(UUID nodeId, Object msg) { - return onMessageReceived(nodeId, (GridHadoopMessage)msg); + return onMessageReceived(nodeId, (HadoopMessage)msg); } }); } @@ -60,7 +60,7 @@ public class HadoopShuffle extends HadoopComponent { * @param cancel If should cancel all ongoing activities. */ @Override public void stop(boolean cancel) { - for (GridHadoopShuffleJob job : jobs.values()) { + for (HadoopShuffleJob job : jobs.values()) { try { job.close(); } @@ -79,10 +79,10 @@ public class HadoopShuffle extends HadoopComponent { * @return Created shuffle job. * @throws IgniteCheckedException If job creation failed. */ - private GridHadoopShuffleJob<UUID> newJob(GridHadoopJobId jobId) throws IgniteCheckedException { + private HadoopShuffleJob<UUID> newJob(GridHadoopJobId jobId) throws IgniteCheckedException { GridHadoopMapReducePlan plan = ctx.jobTracker().plan(jobId); - GridHadoopShuffleJob<UUID> job = new GridHadoopShuffleJob<>(ctx.localNodeId(), log, + HadoopShuffleJob<UUID> job = new HadoopShuffleJob<>(ctx.localNodeId(), log, ctx.jobTracker().job(jobId, null), mem, plan.reducers(), plan.reducers(ctx.localNodeId())); UUID[] rdcAddrs = new UUID[plan.reducers()]; @@ -117,13 +117,13 @@ public class HadoopShuffle extends HadoopComponent { * @param jobId Task info. * @return Shuffle job. */ - private GridHadoopShuffleJob<UUID> job(GridHadoopJobId jobId) throws IgniteCheckedException { - GridHadoopShuffleJob<UUID> res = jobs.get(jobId); + private HadoopShuffleJob<UUID> job(GridHadoopJobId jobId) throws IgniteCheckedException { + HadoopShuffleJob<UUID> res = jobs.get(jobId); if (res == null) { res = newJob(jobId); - GridHadoopShuffleJob<UUID> old = jobs.putIfAbsent(jobId, res); + HadoopShuffleJob<UUID> old = jobs.putIfAbsent(jobId, res); if (old != null) { res.close(); @@ -142,10 +142,10 @@ public class HadoopShuffle extends HadoopComponent { * * @param shuffleJob Job to start sending for. */ - private void startSending(GridHadoopShuffleJob<UUID> shuffleJob) { + private void startSending(HadoopShuffleJob<UUID> shuffleJob) { shuffleJob.startSending(ctx.kernalContext().gridName(), - new IgniteInClosure2X<UUID, GridHadoopShuffleMessage>() { - @Override public void applyx(UUID dest, GridHadoopShuffleMessage msg) throws IgniteCheckedException { + new IgniteInClosure2X<UUID, HadoopShuffleMessage>() { + @Override public void applyx(UUID dest, HadoopShuffleMessage msg) throws IgniteCheckedException { send0(dest, msg); } } @@ -159,9 +159,9 @@ public class HadoopShuffle extends HadoopComponent { * @param msg Received message. * @return {@code True}. */ - public boolean onMessageReceived(UUID src, GridHadoopMessage msg) { - if (msg instanceof GridHadoopShuffleMessage) { - GridHadoopShuffleMessage m = (GridHadoopShuffleMessage)msg; + public boolean onMessageReceived(UUID src, HadoopMessage msg) { + if (msg instanceof HadoopShuffleMessage) { + HadoopShuffleMessage m = (HadoopShuffleMessage)msg; try { job(m.jobId()).onShuffleMessage(m); @@ -172,14 +172,14 @@ public class HadoopShuffle extends HadoopComponent { try { // Reply with ack. - send0(src, new GridHadoopShuffleAck(m.id(), m.jobId())); + send0(src, new HadoopShuffleAck(m.id(), m.jobId())); } catch (IgniteCheckedException e) { U.error(log, "Failed to reply back to shuffle message sender [snd=" + src + ", msg=" + msg + ']', e); } } - else if (msg instanceof GridHadoopShuffleAck) { - GridHadoopShuffleAck m = (GridHadoopShuffleAck)msg; + else if (msg instanceof HadoopShuffleAck) { + HadoopShuffleAck m = (HadoopShuffleAck)msg; try { job(m.jobId()).onShuffleAck(m); @@ -215,7 +215,7 @@ public class HadoopShuffle extends HadoopComponent { * @param jobId Job id. */ public void jobFinished(GridHadoopJobId jobId) { - GridHadoopShuffleJob job = jobs.remove(jobId); + HadoopShuffleJob job = jobs.remove(jobId); if (job != null) { try { @@ -234,7 +234,7 @@ public class HadoopShuffle extends HadoopComponent { * @return Future. */ public IgniteInternalFuture<?> flush(GridHadoopJobId jobId) { - GridHadoopShuffleJob job = jobs.get(jobId); + HadoopShuffleJob job = jobs.get(jobId); if (job == null) return new GridFinishedFutureEx<>(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleAck.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleAck.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleAck.java new file mode 100644 index 0000000..53ff2d1 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleAck.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.shuffle; + +import org.apache.ignite.internal.processors.hadoop.*; +import org.apache.ignite.internal.processors.hadoop.message.*; +import org.apache.ignite.internal.util.tostring.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.io.*; + +/** + * Acknowledgement message. + */ +public class HadoopShuffleAck implements HadoopMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + @GridToStringInclude + private long msgId; + + /** */ + @GridToStringInclude + private GridHadoopJobId jobId; + + /** + * + */ + public HadoopShuffleAck() { + // No-op. + } + + /** + * @param msgId Message ID. + */ + public HadoopShuffleAck(long msgId, GridHadoopJobId jobId) { + assert jobId != null; + + this.msgId = msgId; + this.jobId = jobId; + } + + /** + * @return Message ID. + */ + public long id() { + return msgId; + } + + /** + * @return Job ID. + */ + public GridHadoopJobId jobId() { + return jobId; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + jobId.writeExternal(out); + out.writeLong(msgId); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + jobId = new GridHadoopJobId(); + + jobId.readExternal(in); + msgId = in.readLong(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(HadoopShuffleAck.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java new file mode 100644 index 0000000..a75b34b --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java @@ -0,0 +1,593 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.shuffle; + +import org.apache.ignite.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.hadoop.*; +import org.apache.ignite.internal.processors.hadoop.counter.*; +import org.apache.ignite.internal.processors.hadoop.shuffle.collections.*; +import org.apache.ignite.internal.util.future.*; +import org.apache.ignite.internal.util.io.*; +import org.apache.ignite.internal.util.lang.*; +import org.apache.ignite.internal.util.offheap.unsafe.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.internal.util.worker.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.thread.*; + +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.internal.processors.hadoop.GridHadoopJobProperty.*; +import static org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory.*; + +/** + * Shuffle job. + */ +public class HadoopShuffleJob<T> implements AutoCloseable { + /** */ + private static final int MSG_BUF_SIZE = 128 * 1024; + + /** */ + private final GridHadoopJob job; + + /** */ + private final GridUnsafeMemory mem; + + /** */ + private final boolean needPartitioner; + + /** Collection of task contexts for each reduce task. */ + private final Map<Integer, GridHadoopTaskContext> reducersCtx = new HashMap<>(); + + /** Reducers addresses. */ + private T[] reduceAddrs; + + /** Local reducers address. */ + private final T locReduceAddr; + + /** */ + private final HadoopShuffleMessage[] msgs; + + /** */ + private final AtomicReferenceArray<HadoopMultimap> maps; + + /** */ + private volatile IgniteInClosure2X<T, HadoopShuffleMessage> io; + + /** */ + protected ConcurrentMap<Long, IgniteBiTuple<HadoopShuffleMessage, GridFutureAdapterEx<?>>> sentMsgs = + new ConcurrentHashMap<>(); + + /** */ + private volatile GridWorker snd; + + /** Latch for remote addresses waiting. */ + private final CountDownLatch ioInitLatch = new CountDownLatch(1); + + /** Finished flag. Set on flush or close. */ + private volatile boolean flushed; + + /** */ + private final IgniteLogger log; + + /** + * @param locReduceAddr Local reducer address. + * @param log Logger. + * @param job Job. + * @param mem Memory. + * @param totalReducerCnt Amount of reducers in the Job. + * @param locReducers Reducers will work on current node. + * @throws IgniteCheckedException If error. + */ + public HadoopShuffleJob(T locReduceAddr, IgniteLogger log, GridHadoopJob job, GridUnsafeMemory mem, + int totalReducerCnt, int[] locReducers) throws IgniteCheckedException { + this.locReduceAddr = locReduceAddr; + this.job = job; + this.mem = mem; + this.log = log.getLogger(HadoopShuffleJob.class); + + if (!F.isEmpty(locReducers)) { + for (int rdc : locReducers) { + GridHadoopTaskInfo taskInfo = new GridHadoopTaskInfo(GridHadoopTaskType.REDUCE, job.id(), rdc, 0, null); + + reducersCtx.put(rdc, job.getTaskContext(taskInfo)); + } + } + + needPartitioner = totalReducerCnt > 1; + + maps = new AtomicReferenceArray<>(totalReducerCnt); + msgs = new HadoopShuffleMessage[totalReducerCnt]; + } + + /** + * @param reduceAddrs Addresses of reducers. + * @return {@code True} if addresses were initialized by this call. + */ + public boolean initializeReduceAddresses(T[] reduceAddrs) { + if (this.reduceAddrs == null) { + this.reduceAddrs = reduceAddrs; + + return true; + } + + return false; + } + + /** + * @return {@code True} if reducers addresses were initialized. + */ + public boolean reducersInitialized() { + return reduceAddrs != null; + } + + /** + * @param gridName Grid name. + * @param io IO Closure for sending messages. + */ + @SuppressWarnings("BusyWait") + public void startSending(String gridName, IgniteInClosure2X<T, HadoopShuffleMessage> io) { + assert snd == null; + assert io != null; + + this.io = io; + + if (!flushed) { + snd = new GridWorker(gridName, "hadoop-shuffle-" + job.id(), log) { + @Override protected void body() throws InterruptedException { + try { + while (!isCancelled()) { + Thread.sleep(5); + + collectUpdatesAndSend(false); + } + } + catch (IgniteCheckedException e) { + throw new IllegalStateException(e); + } + } + }; + + new IgniteThread(snd).start(); + } + + ioInitLatch.countDown(); + } + + /** + * @param maps Maps. + * @param idx Index. + * @return Map. + */ + private HadoopMultimap getOrCreateMap(AtomicReferenceArray<HadoopMultimap> maps, int idx) { + HadoopMultimap map = maps.get(idx); + + if (map == null) { // Create new map. + map = get(job.info(), SHUFFLE_REDUCER_NO_SORTING, false) ? + new HadoopConcurrentHashMultimap(job.info(), mem, get(job.info(), PARTITION_HASHMAP_SIZE, 8 * 1024)): + new HadoopSkipList(job.info(), mem); + + if (!maps.compareAndSet(idx, null, map)) { + map.close(); + + return maps.get(idx); + } + } + + return map; + } + + /** + * @param msg Message. + * @throws IgniteCheckedException Exception. + */ + public void onShuffleMessage(HadoopShuffleMessage msg) throws IgniteCheckedException { + assert msg.buffer() != null; + assert msg.offset() > 0; + + GridHadoopTaskContext taskCtx = reducersCtx.get(msg.reducer()); + + HadoopPerformanceCounter perfCntr = HadoopPerformanceCounter.getCounter(taskCtx.counters(), null); + + perfCntr.onShuffleMessage(msg.reducer(), U.currentTimeMillis()); + + HadoopMultimap map = getOrCreateMap(maps, msg.reducer()); + + // Add data from message to the map. + try (HadoopMultimap.Adder adder = map.startAdding(taskCtx)) { + final GridUnsafeDataInput dataInput = new GridUnsafeDataInput(); + final UnsafeValue val = new UnsafeValue(msg.buffer()); + + msg.visit(new HadoopShuffleMessage.Visitor() { + /** */ + private HadoopMultimap.Key key; + + @Override public void onKey(byte[] buf, int off, int len) throws IgniteCheckedException { + dataInput.bytes(buf, off, off + len); + + key = adder.addKey(dataInput, key); + } + + @Override public void onValue(byte[] buf, int off, int len) { + val.off = off; + val.size = len; + + key.add(val); + } + }); + } + } + + /** + * @param ack Shuffle ack. + */ + @SuppressWarnings("ConstantConditions") + public void onShuffleAck(HadoopShuffleAck ack) { + IgniteBiTuple<HadoopShuffleMessage, GridFutureAdapterEx<?>> tup = sentMsgs.get(ack.id()); + + if (tup != null) + tup.get2().onDone(); + else + log.warning("Received shuffle ack for not registered shuffle id: " + ack); + } + + /** + * Unsafe value. + */ + private static class UnsafeValue implements HadoopMultimap.Value { + /** */ + private final byte[] buf; + + /** */ + private int off; + + /** */ + private int size; + + /** + * @param buf Buffer. + */ + private UnsafeValue(byte[] buf) { + assert buf != null; + + this.buf = buf; + } + + /** */ + @Override public int size() { + return size; + } + + /** */ + @Override public void copyTo(long ptr) { + UNSAFE.copyMemory(buf, BYTE_ARR_OFF + off, null, ptr, size); + } + } + + /** + * Sends map updates to remote reducers. + */ + private void collectUpdatesAndSend(boolean flush) throws IgniteCheckedException { + for (int i = 0; i < maps.length(); i++) { + HadoopMultimap map = maps.get(i); + + if (map == null || locReduceAddr.equals(reduceAddrs[i])) + continue; // Skip empty map and local node. + + if (msgs[i] == null) + msgs[i] = new HadoopShuffleMessage(job.id(), i, MSG_BUF_SIZE); + + final int idx = i; + + map.visit(false, new HadoopMultimap.Visitor() { + /** */ + private long keyPtr; + + /** */ + private int keySize; + + /** */ + private boolean keyAdded; + + /** {@inheritDoc} */ + @Override public void onKey(long keyPtr, int keySize) { + this.keyPtr = keyPtr; + this.keySize = keySize; + + keyAdded = false; + } + + private boolean tryAdd(long valPtr, int valSize) { + HadoopShuffleMessage msg = msgs[idx]; + + if (!keyAdded) { // Add key and value. + int size = keySize + valSize; + + if (!msg.available(size, false)) + return false; + + msg.addKey(keyPtr, keySize); + msg.addValue(valPtr, valSize); + + keyAdded = true; + + return true; + } + + if (!msg.available(valSize, true)) + return false; + + msg.addValue(valPtr, valSize); + + return true; + } + + /** {@inheritDoc} */ + @Override public void onValue(long valPtr, int valSize) { + if (tryAdd(valPtr, valSize)) + return; + + send(idx, keySize + valSize); + + keyAdded = false; + + if (!tryAdd(valPtr, valSize)) + throw new IllegalStateException(); + } + }); + + if (flush && msgs[i].offset() != 0) + send(i, 0); + } + } + + /** + * @param idx Index of message. + * @param newBufMinSize Min new buffer size. + */ + private void send(final int idx, int newBufMinSize) { + final GridFutureAdapterEx<?> fut = new GridFutureAdapterEx<>(); + + HadoopShuffleMessage msg = msgs[idx]; + + final long msgId = msg.id(); + + IgniteBiTuple<HadoopShuffleMessage, GridFutureAdapterEx<?>> old = sentMsgs.putIfAbsent(msgId, + new IgniteBiTuple<HadoopShuffleMessage, GridFutureAdapterEx<?>>(msg, fut)); + + assert old == null; + + try { + io.apply(reduceAddrs[idx], msg); + } + catch (GridClosureException e) { + fut.onDone(U.unwrap(e)); + } + + fut.listenAsync(new IgniteInClosure<IgniteInternalFuture<?>>() { + @Override public void apply(IgniteInternalFuture<?> f) { + try { + f.get(); + + // Clean up the future from map only if there was no exception. + // Otherwise flush() should fail. + sentMsgs.remove(msgId); + } + catch (IgniteCheckedException e) { + log.error("Failed to send message.", e); + } + } + }); + + msgs[idx] = newBufMinSize == 0 ? null : new HadoopShuffleMessage(job.id(), idx, + Math.max(MSG_BUF_SIZE, newBufMinSize)); + } + + /** {@inheritDoc} */ + @Override public void close() throws IgniteCheckedException { + if (snd != null) { + snd.cancel(); + + try { + snd.join(); + } + catch (InterruptedException e) { + throw new IgniteInterruptedCheckedException(e); + } + } + + close(maps); + } + + /** + * @param maps Maps. + */ + private void close(AtomicReferenceArray<HadoopMultimap> maps) { + for (int i = 0; i < maps.length(); i++) { + HadoopMultimap map = maps.get(i); + + if (map != null) + map.close(); + } + } + + /** + * @return Future. + */ + @SuppressWarnings("unchecked") + public IgniteInternalFuture<?> flush() throws IgniteCheckedException { + if (log.isDebugEnabled()) + log.debug("Flushing job " + job.id() + " on address " + locReduceAddr); + + flushed = true; + + if (maps.length() == 0) + return new GridFinishedFutureEx<>(); + + U.await(ioInitLatch); + + GridWorker snd0 = snd; + + if (snd0 != null) { + if (log.isDebugEnabled()) + log.debug("Cancelling sender thread."); + + snd0.cancel(); + + try { + snd0.join(); + + if (log.isDebugEnabled()) + log.debug("Finished waiting for sending thread to complete on shuffle job flush: " + job.id()); + } + catch (InterruptedException e) { + throw new IgniteInterruptedCheckedException(e); + } + } + + collectUpdatesAndSend(true); // With flush. + + if (log.isDebugEnabled()) + log.debug("Finished sending collected updates to remote reducers: " + job.id()); + + GridCompoundFuture fut = new GridCompoundFuture<>(); + + for (IgniteBiTuple<HadoopShuffleMessage, GridFutureAdapterEx<?>> tup : sentMsgs.values()) + fut.add(tup.get2()); + + fut.markInitialized(); + + if (log.isDebugEnabled()) + log.debug("Collected futures to compound futures for flush: " + sentMsgs.size()); + + return fut; + } + + /** + * @param taskCtx Task context. + * @return Output. + * @throws IgniteCheckedException If failed. + */ + public GridHadoopTaskOutput output(GridHadoopTaskContext taskCtx) throws IgniteCheckedException { + switch (taskCtx.taskInfo().type()) { + case MAP: + assert !job.info().hasCombiner() : "The output creation is allowed if combiner has not been defined."; + + case COMBINE: + return new PartitionedOutput(taskCtx); + + default: + throw new IllegalStateException("Illegal type: " + taskCtx.taskInfo().type()); + } + } + + /** + * @param taskCtx Task context. + * @return Input. + * @throws IgniteCheckedException If failed. + */ + @SuppressWarnings("unchecked") + public GridHadoopTaskInput input(GridHadoopTaskContext taskCtx) throws IgniteCheckedException { + switch (taskCtx.taskInfo().type()) { + case REDUCE: + int reducer = taskCtx.taskInfo().taskNumber(); + + HadoopMultimap m = maps.get(reducer); + + if (m != null) + return m.input(taskCtx); + + return new GridHadoopTaskInput() { // Empty input. + @Override public boolean next() { + return false; + } + + @Override public Object key() { + throw new IllegalStateException(); + } + + @Override public Iterator<?> values() { + throw new IllegalStateException(); + } + + @Override public void close() { + // No-op. + } + }; + + default: + throw new IllegalStateException("Illegal type: " + taskCtx.taskInfo().type()); + } + } + + /** + * Partitioned output. + */ + private class PartitionedOutput implements GridHadoopTaskOutput { + /** */ + private final GridHadoopTaskOutput[] adders = new GridHadoopTaskOutput[maps.length()]; + + /** */ + private GridHadoopPartitioner partitioner; + + /** */ + private final GridHadoopTaskContext taskCtx; + + /** + * Constructor. + * @param taskCtx Task context. + */ + private PartitionedOutput(GridHadoopTaskContext taskCtx) throws IgniteCheckedException { + this.taskCtx = taskCtx; + + if (needPartitioner) + partitioner = taskCtx.partitioner(); + } + + /** {@inheritDoc} */ + @Override public void write(Object key, Object val) throws IgniteCheckedException { + int part = 0; + + if (partitioner != null) { + part = partitioner.partition(key, val, adders.length); + + if (part < 0 || part >= adders.length) + throw new IgniteCheckedException("Invalid partition: " + part); + } + + GridHadoopTaskOutput out = adders[part]; + + if (out == null) + adders[part] = out = getOrCreateMap(maps, part).startAdding(taskCtx); + + out.write(key, val); + } + + /** {@inheritDoc} */ + @Override public void close() throws IgniteCheckedException { + for (GridHadoopTaskOutput adder : adders) { + if (adder != null) + adder.close(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleMessage.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleMessage.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleMessage.java new file mode 100644 index 0000000..d227e75 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleMessage.java @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.shuffle; + +import org.apache.ignite.*; +import org.apache.ignite.internal.processors.hadoop.*; +import org.apache.ignite.internal.processors.hadoop.message.*; +import org.apache.ignite.internal.util.tostring.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.io.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory.*; + +/** + * Shuffle message. + */ +public class HadoopShuffleMessage implements HadoopMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private static final AtomicLong ids = new AtomicLong(); + + /** */ + private static final byte MARKER_KEY = (byte)17; + + /** */ + private static final byte MARKER_VALUE = (byte)31; + + /** */ + @GridToStringInclude + private long msgId; + + /** */ + @GridToStringInclude + private GridHadoopJobId jobId; + + /** */ + @GridToStringInclude + private int reducer; + + /** */ + private byte[] buf; + + /** */ + @GridToStringInclude + private int off; + + /** + * + */ + public HadoopShuffleMessage() { + // No-op. + } + + /** + * @param size Size. + */ + public HadoopShuffleMessage(GridHadoopJobId jobId, int reducer, int size) { + assert jobId != null; + + buf = new byte[size]; + + this.jobId = jobId; + this.reducer = reducer; + + msgId = ids.incrementAndGet(); + } + + /** + * @return Message ID. + */ + public long id() { + return msgId; + } + + /** + * @return Job ID. + */ + public GridHadoopJobId jobId() { + return jobId; + } + + /** + * @return Reducer. + */ + public int reducer() { + return reducer; + } + + /** + * @return Buffer. + */ + public byte[] buffer() { + return buf; + } + + /** + * @return Offset. + */ + public int offset() { + return off; + } + + /** + * @param size Size. + * @param valOnly Only value wll be added. + * @return {@code true} If this message can fit additional data of this size + */ + public boolean available(int size, boolean valOnly) { + size += valOnly ? 5 : 10; + + if (off + size > buf.length) { + if (off == 0) { // Resize if requested size is too big. + buf = new byte[size]; + + return true; + } + + return false; + } + + return true; + } + + /** + * @param keyPtr Key pointer. + * @param keySize Key size. + */ + public void addKey(long keyPtr, int keySize) { + add(MARKER_KEY, keyPtr, keySize); + } + + /** + * @param valPtr Value pointer. + * @param valSize Value size. + */ + public void addValue(long valPtr, int valSize) { + add(MARKER_VALUE, valPtr, valSize); + } + + /** + * @param marker Marker. + * @param ptr Pointer. + * @param size Size. + */ + private void add(byte marker, long ptr, int size) { + buf[off++] = marker; + + UNSAFE.putInt(buf, BYTE_ARR_OFF + off, size); + + off += 4; + + UNSAFE.copyMemory(null, ptr, buf, BYTE_ARR_OFF + off, size); + + off += size; + } + + /** + * @param v Visitor. + */ + public void visit(Visitor v) throws IgniteCheckedException { + for (int i = 0; i < off;) { + byte marker = buf[i++]; + + int size = UNSAFE.getInt(buf, BYTE_ARR_OFF + i); + + i += 4; + + if (marker == MARKER_VALUE) + v.onValue(buf, i, size); + else if (marker == MARKER_KEY) + v.onKey(buf, i, size); + else + throw new IllegalStateException(); + + i += size; + } + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + jobId.writeExternal(out); + out.writeLong(msgId); + out.writeInt(reducer); + out.writeInt(off); + U.writeByteArray(out, buf); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + jobId = new GridHadoopJobId(); + + jobId.readExternal(in); + msgId = in.readLong(); + reducer = in.readInt(); + off = in.readInt(); + buf = U.readByteArray(in); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(HadoopShuffleMessage.class, this); + } + + /** + * Visitor. + */ + public static interface Visitor { + /** + * @param buf Buffer. + * @param off Offset. + * @param len Length. + */ + public void onKey(byte[] buf, int off, int len) throws IgniteCheckedException; + + /** + * @param buf Buffer. + * @param off Offset. + * @param len Length. + */ + public void onValue(byte[] buf, int off, int len) throws IgniteCheckedException; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopConcurrentHashMultimap.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopConcurrentHashMultimap.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopConcurrentHashMultimap.java deleted file mode 100644 index 32db722..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopConcurrentHashMultimap.java +++ /dev/null @@ -1,611 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.shuffle.collections; - -import org.apache.ignite.*; -import org.apache.ignite.internal.processors.hadoop.*; -import org.apache.ignite.internal.util.*; -import org.apache.ignite.internal.util.offheap.unsafe.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.*; - -/** - * Multimap for map reduce intermediate results. - */ -public class GridHadoopConcurrentHashMultimap extends GridHadoopHashMultimapBase { - /** */ - private final AtomicReference<State> state = new AtomicReference<>(State.READING_WRITING); - - /** */ - private volatile AtomicLongArray oldTbl; - - /** */ - private volatile AtomicLongArray newTbl; - - /** */ - private final AtomicInteger keys = new AtomicInteger(); - - /** */ - private final CopyOnWriteArrayList<AdderImpl> adders = new CopyOnWriteArrayList<>(); - - /** */ - private final AtomicInteger inputs = new AtomicInteger(); - - /** - * @param jobInfo Job info. - * @param mem Memory. - * @param cap Initial capacity. - */ - public GridHadoopConcurrentHashMultimap(GridHadoopJobInfo jobInfo, GridUnsafeMemory mem, int cap) { - super(jobInfo, mem); - - assert U.isPow2(cap); - - newTbl = oldTbl = new AtomicLongArray(cap); - } - - /** - * @return Number of keys. - */ - public long keys() { - int res = keys.get(); - - for (AdderImpl adder : adders) - res += adder.locKeys.get(); - - return res; - } - - /** - * @return Current table capacity. - */ - @Override public int capacity() { - return oldTbl.length(); - } - - /** - * @return Adder object. - * @param ctx Task context. - */ - @Override public Adder startAdding(GridHadoopTaskContext ctx) throws IgniteCheckedException { - if (inputs.get() != 0) - throw new IllegalStateException("Active inputs."); - - if (state.get() == State.CLOSING) - throw new IllegalStateException("Closed."); - - return new AdderImpl(ctx); - } - - /** {@inheritDoc} */ - @Override public void close() { - assert inputs.get() == 0 : inputs.get(); - assert adders.isEmpty() : adders.size(); - - state(State.READING_WRITING, State.CLOSING); - - if (keys() == 0) - return; - - super.close(); - } - - /** {@inheritDoc} */ - @Override protected long meta(int idx) { - return oldTbl.get(idx); - } - - /** - * Incrementally visits all the keys and values in the map. - * - * @param ignoreLastVisited Flag indicating that visiting must be started from the beginning. - * @param v Visitor. - * @return {@code false} If visiting was impossible due to rehashing. - */ - @Override public boolean visit(boolean ignoreLastVisited, Visitor v) throws IgniteCheckedException { - if (!state.compareAndSet(State.READING_WRITING, State.VISITING)) { - assert state.get() != State.CLOSING; - - return false; // Can not visit while rehashing happens. - } - - AtomicLongArray tbl0 = oldTbl; - - for (int i = 0; i < tbl0.length(); i++) { - long meta = tbl0.get(i); - - while (meta != 0) { - long valPtr = value(meta); - - long lastVisited = ignoreLastVisited ? 0 : lastVisitedValue(meta); - - if (valPtr != lastVisited) { - v.onKey(key(meta), keySize(meta)); - - lastVisitedValue(meta, valPtr); // Set it to the first value in chain. - - do { - v.onValue(valPtr + 12, valueSize(valPtr)); - - valPtr = nextValue(valPtr); - } - while (valPtr != lastVisited); - } - - meta = collision(meta); - } - } - - state(State.VISITING, State.READING_WRITING); - - return true; - } - - /** {@inheritDoc} */ - @Override public GridHadoopTaskInput input(GridHadoopTaskContext taskCtx) throws IgniteCheckedException { - inputs.incrementAndGet(); - - if (!adders.isEmpty()) - throw new IllegalStateException("Active adders."); - - State s = state.get(); - - if (s == State.CLOSING) - throw new IllegalStateException("Closed."); - - assert s != State.REHASHING; - - return new Input(taskCtx) { - @Override public void close() throws IgniteCheckedException { - if (inputs.decrementAndGet() < 0) - throw new IllegalStateException(); - - super.close(); - } - }; - } - - /** - * @param fromTbl Table. - */ - private void rehashIfNeeded(AtomicLongArray fromTbl) { - if (fromTbl.length() == Integer.MAX_VALUE) - return; - - long keys0 = keys(); - - if (keys0 < 3 * (fromTbl.length() >>> 2)) // New size has to be >= than 3/4 of capacity to rehash. - return; - - if (fromTbl != newTbl) // Check if someone else have done the job. - return; - - if (!state.compareAndSet(State.READING_WRITING, State.REHASHING)) { - assert state.get() != State.CLOSING; // Visiting is allowed, but we will not rehash. - - return; - } - - if (fromTbl != newTbl) { // Double check. - state(State.REHASHING, State.READING_WRITING); // Switch back. - - return; - } - - // Calculate new table capacity. - int newLen = fromTbl.length(); - - do { - newLen <<= 1; - } - while (newLen < keys0); - - if (keys0 >= 3 * (newLen >>> 2)) // Still more than 3/4. - newLen <<= 1; - - // This is our target table for rehashing. - AtomicLongArray toTbl = new AtomicLongArray(newLen); - - // Make the new table visible before rehashing. - newTbl = toTbl; - - // Rehash. - int newMask = newLen - 1; - - long failedMeta = 0; - - GridLongList collisions = new GridLongList(16); - - for (int i = 0; i < fromTbl.length(); i++) { // Scan source table. - long meta = fromTbl.get(i); - - assert meta != -1; - - if (meta == 0) { // No entry. - failedMeta = 0; - - if (!fromTbl.compareAndSet(i, 0, -1)) // Mark as moved. - i--; // Retry. - - continue; - } - - do { // Collect all the collisions before the last one failed to nullify or 0. - collisions.add(meta); - - meta = collision(meta); - } - while (meta != failedMeta); - - do { // Go from the last to the first to avoid 'in-flight' state for meta entries. - meta = collisions.remove(); - - int addr = keyHash(meta) & newMask; - - for (;;) { // Move meta entry to the new table. - long toCollision = toTbl.get(addr); - - collision(meta, toCollision); - - if (toTbl.compareAndSet(addr, toCollision, meta)) - break; - } - } - while (!collisions.isEmpty()); - - // Here 'meta' will be a root pointer in old table. - if (!fromTbl.compareAndSet(i, meta, -1)) { // Try to mark as moved. - failedMeta = meta; - - i--; // Retry the same address in table because new keys were added. - } - else - failedMeta = 0; - } - - // Now old and new tables will be the same again. - oldTbl = toTbl; - - state(State.REHASHING, State.READING_WRITING); - } - - /** - * Switch state. - * - * @param oldState Expected state. - * @param newState New state. - */ - private void state(State oldState, State newState) { - if (!state.compareAndSet(oldState, newState)) - throw new IllegalStateException(); - } - - /** - * @param meta Meta pointer. - * @return Value pointer. - */ - @Override protected long value(long meta) { - return mem.readLongVolatile(meta + 16); - } - - /** - * @param meta Meta pointer. - * @param oldValPtr Old value. - * @param newValPtr New value. - * @return {@code true} If succeeded. - */ - private boolean casValue(long meta, long oldValPtr, long newValPtr) { - return mem.casLong(meta + 16, oldValPtr, newValPtr); - } - - /** - * @param meta Meta pointer. - * @return Collision pointer. - */ - @Override protected long collision(long meta) { - return mem.readLongVolatile(meta + 24); - } - - /** - * @param meta Meta pointer. - * @param collision Collision pointer. - */ - @Override protected void collision(long meta, long collision) { - assert meta != collision : meta; - - mem.writeLongVolatile(meta + 24, collision); - } - - /** - * @param meta Meta pointer. - * @return Last visited value pointer. - */ - private long lastVisitedValue(long meta) { - return mem.readLong(meta + 32); - } - - /** - * @param meta Meta pointer. - * @param valPtr Last visited value pointer. - */ - private void lastVisitedValue(long meta, long valPtr) { - mem.writeLong(meta + 32, valPtr); - } - - /** - * Adder. Must not be shared between threads. - */ - private class AdderImpl extends AdderBase { - /** */ - private final Reader keyReader; - - /** */ - private final AtomicInteger locKeys = new AtomicInteger(); - - /** */ - private final Random rnd = new GridRandom(); - - /** - * @param ctx Task context. - * @throws IgniteCheckedException If failed. - */ - private AdderImpl(GridHadoopTaskContext ctx) throws IgniteCheckedException { - super(ctx); - - keyReader = new Reader(keySer); - - rehashIfNeeded(oldTbl); - - adders.add(this); - } - - /** - * @param in Data input. - * @param reuse Reusable key. - * @return Key. - * @throws IgniteCheckedException If failed. - */ - @Override public Key addKey(DataInput in, @Nullable Key reuse) throws IgniteCheckedException { - KeyImpl k = reuse == null ? new KeyImpl() : (KeyImpl)reuse; - - k.tmpKey = keySer.read(in, k.tmpKey); - - k.meta = add(k.tmpKey, null); - - return k; - } - - /** {@inheritDoc} */ - @Override public void write(Object key, Object val) throws IgniteCheckedException { - A.notNull(val, "val"); - - add(key, val); - } - - /** - * @param tbl Table. - */ - private void incrementKeys(AtomicLongArray tbl) { - locKeys.lazySet(locKeys.get() + 1); - - if (rnd.nextInt(tbl.length()) < 512) - rehashIfNeeded(tbl); - } - - /** - * @param keyHash Key hash. - * @param keySize Key size. - * @param keyPtr Key pointer. - * @param valPtr Value page pointer. - * @param collisionPtr Pointer to meta with hash collision. - * @param lastVisitedVal Last visited value pointer. - * @return Created meta page pointer. - */ - private long createMeta(int keyHash, int keySize, long keyPtr, long valPtr, long collisionPtr, long lastVisitedVal) { - long meta = allocate(40); - - mem.writeInt(meta, keyHash); - mem.writeInt(meta + 4, keySize); - mem.writeLong(meta + 8, keyPtr); - mem.writeLong(meta + 16, valPtr); - mem.writeLong(meta + 24, collisionPtr); - mem.writeLong(meta + 32, lastVisitedVal); - - return meta; - } - - /** - * @param key Key. - * @param val Value. - * @return Updated or created meta page pointer. - * @throws IgniteCheckedException If failed. - */ - private long add(Object key, @Nullable Object val) throws IgniteCheckedException { - AtomicLongArray tbl = oldTbl; - - int keyHash = U.hash(key.hashCode()); - - long newMetaPtr = 0; - - long valPtr = 0; - - if (val != null) { - valPtr = write(12, val, valSer); - int valSize = writtenSize() - 12; - - valueSize(valPtr, valSize); - } - - for (AtomicLongArray old = null;;) { - int addr = keyHash & (tbl.length() - 1); - - long metaPtrRoot = tbl.get(addr); // Read root meta pointer at this address. - - if (metaPtrRoot == -1) { // The cell was already moved by rehashing. - AtomicLongArray n = newTbl; // Need to read newTbl first here. - AtomicLongArray o = oldTbl; - - tbl = tbl == o ? n : o; // Trying to get the oldest table but newer than ours. - - old = null; - - continue; - } - - if (metaPtrRoot != 0) { // Not empty slot. - long metaPtr = metaPtrRoot; - - do { // Scan all the collisions. - if (keyHash(metaPtr) == keyHash && key.equals(keyReader.readKey(metaPtr))) { // Found key. - if (newMetaPtr != 0) // Deallocate new meta if one was allocated. - localDeallocate(key(newMetaPtr)); // Key was allocated first, so rewind to it's pointer. - - if (valPtr != 0) { // Add value if it exists. - long nextValPtr; - - // Values are linked to each other to a stack like structure. - // Replace the last value in meta with ours and link it as next. - do { - nextValPtr = value(metaPtr); - - nextValue(valPtr, nextValPtr); - } - while (!casValue(metaPtr, nextValPtr, valPtr)); - } - - return metaPtr; - } - - metaPtr = collision(metaPtr); - } - while (metaPtr != 0); - - // Here we did not find our key, need to check if it was moved by rehashing to the new table. - if (old == null) { // If the old table already set, then we will just try to update it. - AtomicLongArray n = newTbl; - - if (n != tbl) { // Rehashing happens, try to find the key in new table but preserve the old one. - old = tbl; - tbl = n; - - continue; - } - } - } - - if (old != null) { // We just checked new table but did not find our key as well as in the old one. - tbl = old; // Try to add new key to the old table. - - addr = keyHash & (tbl.length() - 1); - - old = null; - } - - if (newMetaPtr == 0) { // Allocate new meta page. - long keyPtr = write(0, key, keySer); - int keySize = writtenSize(); - - if (valPtr != 0) - nextValue(valPtr, 0); - - newMetaPtr = createMeta(keyHash, keySize, keyPtr, valPtr, metaPtrRoot, 0); - } - else // Update new meta with root pointer collision. - collision(newMetaPtr, metaPtrRoot); - - if (tbl.compareAndSet(addr, metaPtrRoot, newMetaPtr)) { // Try to replace root pointer with new one. - incrementKeys(tbl); - - return newMetaPtr; - } - } - } - - /** {@inheritDoc} */ - @Override public void close() throws IgniteCheckedException { - if (!adders.remove(this)) - throw new IllegalStateException(); - - keys.addAndGet(locKeys.get()); // Here we have race and #keys() method can return wrong result but it is ok. - - super.close(); - } - - /** - * Key. - */ - private class KeyImpl implements Key { - /** */ - private long meta; - - /** */ - private Object tmpKey; - - /** - * @return Meta pointer for the key. - */ - public long address() { - return meta; - } - - /** - * @param val Value. - */ - @Override public void add(Value val) { - int size = val.size(); - - long valPtr = allocate(size + 12); - - val.copyTo(valPtr + 12); - - valueSize(valPtr, size); - - long nextVal; - - do { - nextVal = value(meta); - - nextValue(valPtr, nextVal); - } - while(!casValue(meta, nextVal, valPtr)); - } - } - } - - /** - * Current map state. - */ - private enum State { - /** */ - REHASHING, - - /** */ - VISITING, - - /** */ - READING_WRITING, - - /** */ - CLOSING - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopHashMultimap.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopHashMultimap.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopHashMultimap.java deleted file mode 100644 index 2795b77..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopHashMultimap.java +++ /dev/null @@ -1,174 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.shuffle.collections; - -import org.apache.ignite.*; -import org.apache.ignite.internal.processors.hadoop.*; -import org.apache.ignite.internal.util.offheap.unsafe.*; -import org.apache.ignite.internal.util.typedef.internal.*; - -/** - * Hash multimap. - */ -public class GridHadoopHashMultimap extends GridHadoopHashMultimapBase { - /** */ - private long[] tbl; - - /** */ - private int keys; - - /** - * @param jobInfo Job info. - * @param mem Memory. - * @param cap Initial capacity. - */ - public GridHadoopHashMultimap(GridHadoopJobInfo jobInfo, GridUnsafeMemory mem, int cap) { - super(jobInfo, mem); - - assert U.isPow2(cap) : cap; - - tbl = new long[cap]; - } - - /** {@inheritDoc} */ - @Override public Adder startAdding(GridHadoopTaskContext ctx) throws IgniteCheckedException { - return new AdderImpl(ctx); - } - - /** - * Rehash. - */ - private void rehash() { - long[] newTbl = new long[tbl.length << 1]; - - int newMask = newTbl.length - 1; - - for (long meta : tbl) { - while (meta != 0) { - long collision = collision(meta); - - int idx = keyHash(meta) & newMask; - - collision(meta, newTbl[idx]); - - newTbl[idx] = meta; - - meta = collision; - } - } - - tbl = newTbl; - } - - /** - * @return Keys count. - */ - public int keys() { - return keys; - } - - /** {@inheritDoc} */ - @Override public int capacity() { - return tbl.length; - } - - /** {@inheritDoc} */ - @Override protected long meta(int idx) { - return tbl[idx]; - } - - /** - * Adder. - */ - private class AdderImpl extends AdderBase { - /** */ - private final Reader keyReader; - - /** - * @param ctx Task context. - * @throws IgniteCheckedException If failed. - */ - protected AdderImpl(GridHadoopTaskContext ctx) throws IgniteCheckedException { - super(ctx); - - keyReader = new Reader(keySer); - } - - /** - * @param keyHash Key hash. - * @param keySize Key size. - * @param keyPtr Key pointer. - * @param valPtr Value page pointer. - * @param collisionPtr Pointer to meta with hash collision. - * @return Created meta page pointer. - */ - private long createMeta(int keyHash, int keySize, long keyPtr, long valPtr, long collisionPtr) { - long meta = allocate(32); - - mem.writeInt(meta, keyHash); - mem.writeInt(meta + 4, keySize); - mem.writeLong(meta + 8, keyPtr); - mem.writeLong(meta + 16, valPtr); - mem.writeLong(meta + 24, collisionPtr); - - return meta; - } - - /** {@inheritDoc} */ - @Override public void write(Object key, Object val) throws IgniteCheckedException { - A.notNull(val, "val"); - - int keyHash = U.hash(key.hashCode()); - - // Write value. - long valPtr = write(12, val, valSer); - int valSize = writtenSize() - 12; - - valueSize(valPtr, valSize); - - // Find position in table. - int idx = keyHash & (tbl.length - 1); - - long meta = tbl[idx]; - - // Search for our key in collisions. - while (meta != 0) { - if (keyHash(meta) == keyHash && key.equals(keyReader.readKey(meta))) { // Found key. - nextValue(valPtr, value(meta)); - - value(meta, valPtr); - - return; - } - - meta = collision(meta); - } - - // Write key. - long keyPtr = write(0, key, keySer); - int keySize = writtenSize(); - - nextValue(valPtr, 0); - - tbl[idx] = createMeta(keyHash, keySize, keyPtr, valPtr, tbl[idx]); - - if (++keys > (tbl.length >>> 2) * 3) - rehash(); - } - } -}