http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipList.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipList.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipList.java new file mode 100644 index 0000000..07bae6b --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipList.java @@ -0,0 +1,726 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.shuffle.collections; + +import org.apache.ignite.*; +import org.apache.ignite.internal.processors.hadoop.*; +import org.apache.ignite.internal.util.*; +import org.apache.ignite.internal.util.offheap.unsafe.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.util.*; +import java.util.concurrent.atomic.*; + +/** + * Skip list. + */ +public class HadoopSkipList extends HadoopMultimapBase { + /** */ + private static final int HEADS_SIZE = 24 + 33 * 8; // Offset + max level is from 0 to 32 inclusive. + + /** Top level. */ + private final AtomicInteger topLevel = new AtomicInteger(-1); + + /** Heads for all the lists. */ + private final long heads; + + /** */ + private final AtomicBoolean visitGuard = new AtomicBoolean(); + + /** + * @param jobInfo Job info. + * @param mem Memory. + */ + public HadoopSkipList(GridHadoopJobInfo jobInfo, GridUnsafeMemory mem) { + super(jobInfo, mem); + + heads = mem.allocate(HEADS_SIZE, true); + } + + /** {@inheritDoc} */ + @Override public void close() { + super.close(); + + mem.release(heads, HEADS_SIZE); + } + + /** {@inheritDoc} */ + @Override public boolean visit(boolean ignoreLastVisited, Visitor v) throws IgniteCheckedException { + if (!visitGuard.compareAndSet(false, true)) + return false; + + for (long meta = nextMeta(heads, 0); meta != 0L; meta = nextMeta(meta, 0)) { + long valPtr = value(meta); + + long lastVisited = ignoreLastVisited ? 0 : lastVisitedValue(meta); + + if (valPtr != lastVisited) { + long k = key(meta); + + v.onKey(k + 4, keySize(k)); + + lastVisitedValue(meta, valPtr); // Set it to the first value in chain. + + do { + v.onValue(valPtr + 12, valueSize(valPtr)); + + valPtr = nextValue(valPtr); + } + while (valPtr != lastVisited); + } + } + + visitGuard.lazySet(false); + + return true; + } + + /** {@inheritDoc} */ + @Override public Adder startAdding(GridHadoopTaskContext ctx) throws IgniteCheckedException { + return new AdderImpl(ctx); + } + + /** {@inheritDoc} */ + @Override public GridHadoopTaskInput input(GridHadoopTaskContext taskCtx) throws IgniteCheckedException { + Input in = new Input(taskCtx); + + Comparator<Object> grpCmp = taskCtx.groupComparator(); + + if (grpCmp != null) + return new GroupedInput(grpCmp, in); + + return in; + } + + /** + * @param meta Meta pointer. + * @return Key pointer. + */ + private long key(long meta) { + return mem.readLong(meta); + } + + /** + * @param meta Meta pointer. + * @param key Key pointer. + */ + private void key(long meta, long key) { + mem.writeLong(meta, key); + } + + /** + * @param meta Meta pointer. + * @return Value pointer. + */ + private long value(long meta) { + return mem.readLongVolatile(meta + 8); + } + + /** + * @param meta Meta pointer. + * @param valPtr Value pointer. + */ + private void value(long meta, long valPtr) { + mem.writeLongVolatile(meta + 8, valPtr); + } + + /** + * @param meta Meta pointer. + * @param oldValPtr Old first value pointer. + * @param newValPtr New first value pointer. + * @return {@code true} If operation succeeded. + */ + private boolean casValue(long meta, long oldValPtr, long newValPtr) { + return mem.casLong(meta + 8, oldValPtr, newValPtr); + } + + /** + * @param meta Meta pointer. + * @return Last visited value pointer. + */ + private long lastVisitedValue(long meta) { + return mem.readLong(meta + 16); + } + + /** + * @param meta Meta pointer. + * @param valPtr Last visited value pointer. + */ + private void lastVisitedValue(long meta, long valPtr) { + mem.writeLong(meta + 16, valPtr); + } + + /** + * @param meta Meta pointer. + * @param level Level. + * @return Next meta pointer. + */ + private long nextMeta(long meta, int level) { + assert meta > 0 : meta; + + return mem.readLongVolatile(meta + 24 + 8 * level); + } + + /** + * @param meta Meta pointer. + * @param level Level. + * @param oldNext Old next meta pointer. + * @param newNext New next meta pointer. + * @return {@code true} If operation succeeded. + */ + private boolean casNextMeta(long meta, int level, long oldNext, long newNext) { + assert meta > 0 : meta; + + return mem.casLong(meta + 24 + 8 * level, oldNext, newNext); + } + + /** + * @param meta Meta pointer. + * @param level Level. + * @param nextMeta Next meta. + */ + private void nextMeta(long meta, int level, long nextMeta) { + assert meta != 0; + + mem.writeLong(meta + 24 + 8 * level, nextMeta); + } + + /** + * @param keyPtr Key pointer. + * @return Key size. + */ + private int keySize(long keyPtr) { + return mem.readInt(keyPtr); + } + + /** + * @param keyPtr Key pointer. + * @param keySize Key size. + */ + private void keySize(long keyPtr, int keySize) { + mem.writeInt(keyPtr, keySize); + } + + /** + * @param rnd Random. + * @return Next level. + */ + public static int randomLevel(Random rnd) { + int x = rnd.nextInt(); + + int level = 0; + + while ((x & 1) != 0) { // Count sequential 1 bits. + level++; + + x >>>= 1; + } + + return level; + } + + /** + * Reader. + */ + private class Reader extends ReaderBase { + /** + * @param ser Serialization. + */ + protected Reader(GridHadoopSerialization ser) { + super(ser); + } + + /** + * @param meta Meta pointer. + * @return Key. + */ + public Object readKey(long meta) { + assert meta > 0 : meta; + + long k = key(meta); + + try { + return read(k + 4, keySize(k)); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + } + + /** + * Adder. + */ + private class AdderImpl extends AdderBase { + /** */ + private final Comparator<Object> cmp; + + /** */ + private final Random rnd = new GridRandom(); + + /** */ + private final GridLongList stack = new GridLongList(16); + + /** */ + private final Reader keyReader; + + /** + * @param ctx Task context. + * @throws IgniteCheckedException If failed. + */ + protected AdderImpl(GridHadoopTaskContext ctx) throws IgniteCheckedException { + super(ctx); + + keyReader = new Reader(keySer); + + cmp = ctx.sortComparator(); + } + + /** {@inheritDoc} */ + @Override public void write(Object key, Object val) throws IgniteCheckedException { + A.notNull(val, "val"); + + add(key, val); + } + + /** {@inheritDoc} */ + @Override public Key addKey(DataInput in, @Nullable Key reuse) throws IgniteCheckedException { + KeyImpl k = reuse == null ? new KeyImpl() : (KeyImpl)reuse; + + k.tmpKey = keySer.read(in, k.tmpKey); + + k.meta = add(k.tmpKey, null); + + return k; + } + + /** + * @param key Key. + * @param val Value. + * @param level Level. + * @return Meta pointer. + */ + private long createMeta(long key, long val, int level) { + int size = 32 + 8 * level; + + long meta = allocate(size); + + key(meta, key); + value(meta, val); + lastVisitedValue(meta, 0L); + + for (int i = 32; i < size; i += 8) // Fill with 0. + mem.writeLong(meta + i, 0L); + + return meta; + } + + /** + * @param key Key. + * @return Pointer. + * @throws IgniteCheckedException If failed. + */ + private long writeKey(Object key) throws IgniteCheckedException { + long keyPtr = write(4, key, keySer); + int keySize = writtenSize() - 4; + + keySize(keyPtr, keySize); + + return keyPtr; + } + + /** + * @param prevMeta Previous meta. + * @param meta Next meta. + */ + private void stackPush(long prevMeta, long meta) { + stack.add(prevMeta); + stack.add(meta); + } + + /** + * Drops last remembered frame from the stack. + */ + private void stackPop() { + stack.pop(2); + } + + /** + * @param key Key. + * @param val Value. + * @return Meta pointer. + * @throws IgniteCheckedException If failed. + */ + private long add(Object key, @Nullable Object val) throws IgniteCheckedException { + assert key != null; + + stack.clear(); + + long valPtr = 0; + + if (val != null) { // Write value. + valPtr = write(12, val, valSer); + int valSize = writtenSize() - 12; + + nextValue(valPtr, 0); + valueSize(valPtr, valSize); + } + + long keyPtr = 0; + long newMeta = 0; + int newMetaLevel = -1; + + long prevMeta = heads; + int level = topLevel.get(); + long meta = level < 0 ? 0 : nextMeta(heads, level); + + for (;;) { + if (level < 0) { // We did not find our key, trying to add new meta. + if (keyPtr == 0) { // Write key and create meta only once. + keyPtr = writeKey(key); + + newMetaLevel = randomLevel(rnd); + newMeta = createMeta(keyPtr, valPtr, newMetaLevel); + } + + nextMeta(newMeta, 0, meta); // Set next to new meta before publishing. + + if (casNextMeta(prevMeta, 0, meta, newMeta)) { // New key was added successfully. + laceUp(key, newMeta, newMetaLevel); + + return newMeta; + } + else { // Add failed, need to check out what was added by another thread. + meta = nextMeta(prevMeta, level = 0); + + stackPop(); + } + } + + int cmpRes = cmp(key, meta); + + if (cmpRes == 0) { // Key found. + if (newMeta != 0) // Deallocate if we've allocated something. + localDeallocate(keyPtr); + + if (valPtr == 0) // Only key needs to be added. + return meta; + + for (;;) { // Add value for the key found. + long nextVal = value(meta); + + nextValue(valPtr, nextVal); + + if (casValue(meta, nextVal, valPtr)) + return meta; + } + } + + assert cmpRes != 0; + + if (cmpRes > 0) { // Go right. + prevMeta = meta; + meta = nextMeta(meta, level); + + if (meta != 0) // If nothing to the right then go down. + continue; + } + + while (--level >= 0) { // Go down. + stackPush(prevMeta, meta); // Remember the path. + + long nextMeta = nextMeta(prevMeta, level); + + if (nextMeta != meta) { // If the meta is the same as on upper level go deeper. + meta = nextMeta; + + assert meta != 0; + + break; + } + } + } + } + + /** + * @param key Key. + * @param meta Meta pointer. + * @return Comparison result. + */ + @SuppressWarnings("unchecked") + private int cmp(Object key, long meta) { + assert meta != 0; + + return cmp.compare(key, keyReader.readKey(meta)); + } + + /** + * Adds appropriate index links between metas. + * + * @param newMeta Just added meta. + * @param newMetaLevel New level. + */ + private void laceUp(Object key, long newMeta, int newMetaLevel) { + for (int level = 1; level <= newMetaLevel; level++) { // Go from the bottom up. + long prevMeta = heads; + long meta = 0; + + if (!stack.isEmpty()) { // Get the path back. + meta = stack.remove(); + prevMeta = stack.remove(); + } + + for (;;) { + nextMeta(newMeta, level, meta); + + if (casNextMeta(prevMeta, level, meta, newMeta)) + break; + + long oldMeta = meta; + + meta = nextMeta(prevMeta, level); // Reread meta. + + for (;;) { + int cmpRes = cmp(key, meta); + + if (cmpRes > 0) { // Go right. + prevMeta = meta; + meta = nextMeta(prevMeta, level); + + if (meta != oldMeta) // Old meta already known to be greater than ours or is 0. + continue; + } + + assert cmpRes != 0; // Two different metas with equal keys must be impossible. + + break; // Retry cas. + } + } + } + + if (!stack.isEmpty()) + return; // Our level already lower than top. + + for (;;) { // Raise top level. + int top = topLevel.get(); + + if (newMetaLevel <= top || topLevel.compareAndSet(top, newMetaLevel)) + break; + } + } + + /** + * Key. + */ + private class KeyImpl implements Key { + /** */ + private long meta; + + /** */ + private Object tmpKey; + + /** + * @return Meta pointer for the key. + */ + public long address() { + return meta; + } + + /** + * @param val Value. + */ + @Override public void add(Value val) { + int size = val.size(); + + long valPtr = allocate(size + 12); + + val.copyTo(valPtr + 12); + + valueSize(valPtr, size); + + long nextVal; + + do { + nextVal = value(meta); + + nextValue(valPtr, nextVal); + } + while(!casValue(meta, nextVal, valPtr)); + } + } + } + + /** + * Task input. + */ + private class Input implements GridHadoopTaskInput { + /** */ + private long metaPtr = heads; + + /** */ + private final Reader keyReader; + + /** */ + private final Reader valReader; + + /** + * @param taskCtx Task context. + * @throws IgniteCheckedException If failed. + */ + private Input(GridHadoopTaskContext taskCtx) throws IgniteCheckedException { + keyReader = new Reader(taskCtx.keySerialization()); + valReader = new Reader(taskCtx.valueSerialization()); + } + + /** {@inheritDoc} */ + @Override public boolean next() { + metaPtr = nextMeta(metaPtr, 0); + + return metaPtr != 0; + } + + /** {@inheritDoc} */ + @Override public Object key() { + return keyReader.readKey(metaPtr); + } + + /** {@inheritDoc} */ + @Override public Iterator<?> values() { + return new ValueIterator(value(metaPtr), valReader); + } + + /** {@inheritDoc} */ + @Override public void close() throws IgniteCheckedException { + keyReader.close(); + valReader.close(); + } + } + + /** + * Grouped input using grouping comparator. + */ + private class GroupedInput implements GridHadoopTaskInput { + /** */ + private final Comparator<Object> grpCmp; + + /** */ + private final Input in; + + /** */ + private Object prevKey; + + /** */ + private Object nextKey; + + /** */ + private final GridLongList vals = new GridLongList(); + + /** + * @param grpCmp Grouping comparator. + * @param in Input. + */ + private GroupedInput(Comparator<Object> grpCmp, Input in) { + this.grpCmp = grpCmp; + this.in = in; + } + + /** {@inheritDoc} */ + @Override public boolean next() { + if (prevKey == null) { // First call. + if (!in.next()) + return false; + + prevKey = in.key(); + + assert prevKey != null; + + in.keyReader.resetReusedObject(null); // We need 2 instances of key object for comparison. + + vals.add(value(in.metaPtr)); + } + else { + if (in.metaPtr == 0) // We reached the end of the input. + return false; + + vals.clear(); + + vals.add(value(in.metaPtr)); + + in.keyReader.resetReusedObject(prevKey); // Switch key instances. + + prevKey = nextKey; + } + + while (in.next()) { // Fill with head value pointers with equal keys. + if (grpCmp.compare(prevKey, nextKey = in.key()) == 0) + vals.add(value(in.metaPtr)); + else + break; + } + + assert !vals.isEmpty(); + + return true; + } + + /** {@inheritDoc} */ + @Override public Object key() { + return prevKey; + } + + /** {@inheritDoc} */ + @Override public Iterator<?> values() { + assert !vals.isEmpty(); + + final ValueIterator valIter = new ValueIterator(vals.get(0), in.valReader); + + return new Iterator<Object>() { + /** */ + private int idx; + + @Override public boolean hasNext() { + if (!valIter.hasNext()) { + if (++idx == vals.size()) + return false; + + valIter.head(vals.get(idx)); + + assert valIter.hasNext(); + } + + return true; + } + + @Override public Object next() { + return valIter.next(); + } + + @Override public void remove() { + valIter.remove(); + } + }; + } + + /** {@inheritDoc} */ + @Override public void close() throws IgniteCheckedException { + in.close(); + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/GridHadoopDataInStream.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/GridHadoopDataInStream.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/GridHadoopDataInStream.java deleted file mode 100644 index 8b4f0c4..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/GridHadoopDataInStream.java +++ /dev/null @@ -1,170 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.shuffle.streams; - -import org.apache.ignite.internal.util.offheap.unsafe.*; - -import java.io.*; -import java.nio.charset.*; - -/** - * Data input stream. - */ -public class GridHadoopDataInStream extends InputStream implements DataInput { - /** */ - private final GridHadoopOffheapBuffer buf = new GridHadoopOffheapBuffer(0, 0); - - /** */ - private final GridUnsafeMemory mem; - - /** - * @param mem Memory. - */ - public GridHadoopDataInStream(GridUnsafeMemory mem) { - assert mem != null; - - this.mem = mem; - } - - /** - * @return Buffer. - */ - public GridHadoopOffheapBuffer buffer() { - return buf; - } - - /** - * @param size Size. - * @return Old pointer. - */ - protected long move(long size) throws IOException { - long ptr = buf.move(size); - - assert ptr != 0; - - return ptr; - } - - /** {@inheritDoc} */ - @Override public int read() throws IOException { - return readUnsignedByte(); - } - - /** {@inheritDoc} */ - @Override public int read(byte[] b, int off, int len) throws IOException { - readFully(b, off, len); - - return len; - } - - /** {@inheritDoc} */ - @Override public long skip(long n) throws IOException { - move(n); - - return n; - } - - /** {@inheritDoc} */ - @Override public void readFully(byte[] b) throws IOException { - readFully(b, 0, b.length); - } - - /** {@inheritDoc} */ - @Override public void readFully(byte[] b, int off, int len) throws IOException { - mem.readBytes(move(len), b, off, len); - } - - /** {@inheritDoc} */ - @Override public int skipBytes(int n) throws IOException { - move(n); - - return n; - } - - /** {@inheritDoc} */ - @Override public boolean readBoolean() throws IOException { - byte res = readByte(); - - if (res == 1) - return true; - - assert res == 0 : res; - - return false; - } - - /** {@inheritDoc} */ - @Override public byte readByte() throws IOException { - return mem.readByte(move(1)); - } - - /** {@inheritDoc} */ - @Override public int readUnsignedByte() throws IOException { - return readByte() & 0xff; - } - - /** {@inheritDoc} */ - @Override public short readShort() throws IOException { - return mem.readShort(move(2)); - } - - /** {@inheritDoc} */ - @Override public int readUnsignedShort() throws IOException { - return readShort() & 0xffff; - } - - /** {@inheritDoc} */ - @Override public char readChar() throws IOException { - return (char)readShort(); - } - - /** {@inheritDoc} */ - @Override public int readInt() throws IOException { - return mem.readInt(move(4)); - } - - /** {@inheritDoc} */ - @Override public long readLong() throws IOException { - return mem.readLong(move(8)); - } - - /** {@inheritDoc} */ - @Override public float readFloat() throws IOException { - return mem.readFloat(move(4)); - } - - /** {@inheritDoc} */ - @Override public double readDouble() throws IOException { - return mem.readDouble(move(8)); - } - - /** {@inheritDoc} */ - @Override public String readLine() throws IOException { - throw new UnsupportedOperationException(); - } - - /** {@inheritDoc} */ - @Override public String readUTF() throws IOException { - byte[] bytes = new byte[readInt()]; - - if (bytes.length != 0) - readFully(bytes); - - return new String(bytes, StandardCharsets.UTF_8); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/GridHadoopDataOutStream.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/GridHadoopDataOutStream.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/GridHadoopDataOutStream.java deleted file mode 100644 index 8b837c8..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/GridHadoopDataOutStream.java +++ /dev/null @@ -1,131 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.shuffle.streams; - -import org.apache.ignite.internal.util.offheap.unsafe.*; - -import java.io.*; -import java.nio.charset.*; - -import static org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory.*; - -/** - * Data output stream. - */ -public class GridHadoopDataOutStream extends OutputStream implements DataOutput { - /** */ - private final GridHadoopOffheapBuffer buf = new GridHadoopOffheapBuffer(0, 0); - - /** */ - private final GridUnsafeMemory mem; - - /** - * @param mem Memory. - */ - public GridHadoopDataOutStream(GridUnsafeMemory mem) { - this.mem = mem; - } - - /** - * @return Buffer. - */ - public GridHadoopOffheapBuffer buffer() { - return buf; - } - - /** - * @param size Size. - * @return Old pointer or {@code 0} if move was impossible. - */ - public long move(long size) { - return buf.move(size); - } - - /** {@inheritDoc} */ - @Override public void write(int b) { - writeByte(b); - } - - /** {@inheritDoc} */ - @Override public void write(byte[] b) { - write(b, 0, b.length); - } - - /** {@inheritDoc} */ - @Override public void write(byte[] b, int off, int len) { - UNSAFE.copyMemory(b, BYTE_ARR_OFF + off, null, move(len), len); - } - - /** {@inheritDoc} */ - @Override public void writeBoolean(boolean v) { - writeByte(v ? 1 : 0); - } - - /** {@inheritDoc} */ - @Override public void writeByte(int v) { - mem.writeByte(move(1), (byte)v); - } - - /** {@inheritDoc} */ - @Override public void writeShort(int v) { - mem.writeShort(move(2), (short)v); - } - - /** {@inheritDoc} */ - @Override public void writeChar(int v) { - writeShort(v); - } - - /** {@inheritDoc} */ - @Override public void writeInt(int v) { - mem.writeInt(move(4), v); - } - - /** {@inheritDoc} */ - @Override public void writeLong(long v) { - mem.writeLong(move(8), v); - } - - /** {@inheritDoc} */ - @Override public void writeFloat(float v) { - mem.writeFloat(move(4), v); - } - - /** {@inheritDoc} */ - @Override public void writeDouble(double v) { - mem.writeDouble(move(8), v); - } - - /** {@inheritDoc} */ - @Override public void writeBytes(String s) { - writeUTF(s); - } - - /** {@inheritDoc} */ - @Override public void writeChars(String s) { - writeUTF(s); - } - - /** {@inheritDoc} */ - @Override public void writeUTF(String s) { - byte[] b = s.getBytes(StandardCharsets.UTF_8); - - writeInt(b.length); - write(b); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/GridHadoopOffheapBuffer.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/GridHadoopOffheapBuffer.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/GridHadoopOffheapBuffer.java deleted file mode 100644 index f9f0e1d..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/GridHadoopOffheapBuffer.java +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.shuffle.streams; - -/** - * Offheap buffer. - */ -public class GridHadoopOffheapBuffer { - /** Buffer begin address. */ - private long bufPtr; - - /** The first address we do not own. */ - private long bufEnd; - - /** Current read or write pointer. */ - private long posPtr; - - /** - * @param bufPtr Pointer to buffer begin. - * @param bufSize Size of the buffer. - */ - public GridHadoopOffheapBuffer(long bufPtr, long bufSize) { - set(bufPtr, bufSize); - } - - /** - * @param bufPtr Pointer to buffer begin. - * @param bufSize Size of the buffer. - */ - public void set(long bufPtr, long bufSize) { - this.bufPtr = bufPtr; - - posPtr = bufPtr; - bufEnd = bufPtr + bufSize; - } - - /** - * @return Pointer to internal buffer begin. - */ - public long begin() { - return bufPtr; - } - - /** - * @return Buffer capacity. - */ - public long capacity() { - return bufEnd - bufPtr; - } - - /** - * @return Remaining capacity. - */ - public long remaining() { - return bufEnd - posPtr; - } - - /** - * @return Absolute pointer to the current position inside of the buffer. - */ - public long pointer() { - return posPtr; - } - - /** - * @param ptr Absolute pointer to the current position inside of the buffer. - */ - public void pointer(long ptr) { - assert ptr >= bufPtr : bufPtr + " <= " + ptr; - assert ptr <= bufEnd : bufEnd + " <= " + bufPtr; - - posPtr = ptr; - } - - /** - * @param size Size move on. - * @return Old position pointer or {@code 0} if move goes beyond the end of the buffer. - */ - public long move(long size) { - assert size > 0 : size; - - long oldPos = posPtr; - long newPos = oldPos + size; - - if (newPos > bufEnd) - return 0; - - posPtr = newPos; - - return oldPos; - } - - /** - * @param ptr Pointer. - * @return {@code true} If the given pointer is inside of this buffer. - */ - public boolean isInside(long ptr) { - return ptr >= bufPtr && ptr <= bufEnd; - } - - /** - * Resets position to the beginning of buffer. - */ - public void reset() { - posPtr = bufPtr; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopDataInStream.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopDataInStream.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopDataInStream.java new file mode 100644 index 0000000..8a1ee70 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopDataInStream.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.shuffle.streams; + +import org.apache.ignite.internal.util.offheap.unsafe.*; + +import java.io.*; +import java.nio.charset.*; + +/** + * Data input stream. + */ +public class HadoopDataInStream extends InputStream implements DataInput { + /** */ + private final HadoopOffheapBuffer buf = new HadoopOffheapBuffer(0, 0); + + /** */ + private final GridUnsafeMemory mem; + + /** + * @param mem Memory. + */ + public HadoopDataInStream(GridUnsafeMemory mem) { + assert mem != null; + + this.mem = mem; + } + + /** + * @return Buffer. + */ + public HadoopOffheapBuffer buffer() { + return buf; + } + + /** + * @param size Size. + * @return Old pointer. + */ + protected long move(long size) throws IOException { + long ptr = buf.move(size); + + assert ptr != 0; + + return ptr; + } + + /** {@inheritDoc} */ + @Override public int read() throws IOException { + return readUnsignedByte(); + } + + /** {@inheritDoc} */ + @Override public int read(byte[] b, int off, int len) throws IOException { + readFully(b, off, len); + + return len; + } + + /** {@inheritDoc} */ + @Override public long skip(long n) throws IOException { + move(n); + + return n; + } + + /** {@inheritDoc} */ + @Override public void readFully(byte[] b) throws IOException { + readFully(b, 0, b.length); + } + + /** {@inheritDoc} */ + @Override public void readFully(byte[] b, int off, int len) throws IOException { + mem.readBytes(move(len), b, off, len); + } + + /** {@inheritDoc} */ + @Override public int skipBytes(int n) throws IOException { + move(n); + + return n; + } + + /** {@inheritDoc} */ + @Override public boolean readBoolean() throws IOException { + byte res = readByte(); + + if (res == 1) + return true; + + assert res == 0 : res; + + return false; + } + + /** {@inheritDoc} */ + @Override public byte readByte() throws IOException { + return mem.readByte(move(1)); + } + + /** {@inheritDoc} */ + @Override public int readUnsignedByte() throws IOException { + return readByte() & 0xff; + } + + /** {@inheritDoc} */ + @Override public short readShort() throws IOException { + return mem.readShort(move(2)); + } + + /** {@inheritDoc} */ + @Override public int readUnsignedShort() throws IOException { + return readShort() & 0xffff; + } + + /** {@inheritDoc} */ + @Override public char readChar() throws IOException { + return (char)readShort(); + } + + /** {@inheritDoc} */ + @Override public int readInt() throws IOException { + return mem.readInt(move(4)); + } + + /** {@inheritDoc} */ + @Override public long readLong() throws IOException { + return mem.readLong(move(8)); + } + + /** {@inheritDoc} */ + @Override public float readFloat() throws IOException { + return mem.readFloat(move(4)); + } + + /** {@inheritDoc} */ + @Override public double readDouble() throws IOException { + return mem.readDouble(move(8)); + } + + /** {@inheritDoc} */ + @Override public String readLine() throws IOException { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public String readUTF() throws IOException { + byte[] bytes = new byte[readInt()]; + + if (bytes.length != 0) + readFully(bytes); + + return new String(bytes, StandardCharsets.UTF_8); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopDataOutStream.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopDataOutStream.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopDataOutStream.java new file mode 100644 index 0000000..51bddf9 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopDataOutStream.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.shuffle.streams; + +import org.apache.ignite.internal.util.offheap.unsafe.*; + +import java.io.*; +import java.nio.charset.*; + +import static org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory.*; + +/** + * Data output stream. + */ +public class HadoopDataOutStream extends OutputStream implements DataOutput { + /** */ + private final HadoopOffheapBuffer buf = new HadoopOffheapBuffer(0, 0); + + /** */ + private final GridUnsafeMemory mem; + + /** + * @param mem Memory. + */ + public HadoopDataOutStream(GridUnsafeMemory mem) { + this.mem = mem; + } + + /** + * @return Buffer. + */ + public HadoopOffheapBuffer buffer() { + return buf; + } + + /** + * @param size Size. + * @return Old pointer or {@code 0} if move was impossible. + */ + public long move(long size) { + return buf.move(size); + } + + /** {@inheritDoc} */ + @Override public void write(int b) { + writeByte(b); + } + + /** {@inheritDoc} */ + @Override public void write(byte[] b) { + write(b, 0, b.length); + } + + /** {@inheritDoc} */ + @Override public void write(byte[] b, int off, int len) { + UNSAFE.copyMemory(b, BYTE_ARR_OFF + off, null, move(len), len); + } + + /** {@inheritDoc} */ + @Override public void writeBoolean(boolean v) { + writeByte(v ? 1 : 0); + } + + /** {@inheritDoc} */ + @Override public void writeByte(int v) { + mem.writeByte(move(1), (byte)v); + } + + /** {@inheritDoc} */ + @Override public void writeShort(int v) { + mem.writeShort(move(2), (short)v); + } + + /** {@inheritDoc} */ + @Override public void writeChar(int v) { + writeShort(v); + } + + /** {@inheritDoc} */ + @Override public void writeInt(int v) { + mem.writeInt(move(4), v); + } + + /** {@inheritDoc} */ + @Override public void writeLong(long v) { + mem.writeLong(move(8), v); + } + + /** {@inheritDoc} */ + @Override public void writeFloat(float v) { + mem.writeFloat(move(4), v); + } + + /** {@inheritDoc} */ + @Override public void writeDouble(double v) { + mem.writeDouble(move(8), v); + } + + /** {@inheritDoc} */ + @Override public void writeBytes(String s) { + writeUTF(s); + } + + /** {@inheritDoc} */ + @Override public void writeChars(String s) { + writeUTF(s); + } + + /** {@inheritDoc} */ + @Override public void writeUTF(String s) { + byte[] b = s.getBytes(StandardCharsets.UTF_8); + + writeInt(b.length); + write(b); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopOffheapBuffer.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopOffheapBuffer.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopOffheapBuffer.java new file mode 100644 index 0000000..a8e7a33 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopOffheapBuffer.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.shuffle.streams; + +/** + * Offheap buffer. + */ +public class HadoopOffheapBuffer { + /** Buffer begin address. */ + private long bufPtr; + + /** The first address we do not own. */ + private long bufEnd; + + /** Current read or write pointer. */ + private long posPtr; + + /** + * @param bufPtr Pointer to buffer begin. + * @param bufSize Size of the buffer. + */ + public HadoopOffheapBuffer(long bufPtr, long bufSize) { + set(bufPtr, bufSize); + } + + /** + * @param bufPtr Pointer to buffer begin. + * @param bufSize Size of the buffer. + */ + public void set(long bufPtr, long bufSize) { + this.bufPtr = bufPtr; + + posPtr = bufPtr; + bufEnd = bufPtr + bufSize; + } + + /** + * @return Pointer to internal buffer begin. + */ + public long begin() { + return bufPtr; + } + + /** + * @return Buffer capacity. + */ + public long capacity() { + return bufEnd - bufPtr; + } + + /** + * @return Remaining capacity. + */ + public long remaining() { + return bufEnd - posPtr; + } + + /** + * @return Absolute pointer to the current position inside of the buffer. + */ + public long pointer() { + return posPtr; + } + + /** + * @param ptr Absolute pointer to the current position inside of the buffer. + */ + public void pointer(long ptr) { + assert ptr >= bufPtr : bufPtr + " <= " + ptr; + assert ptr <= bufEnd : bufEnd + " <= " + bufPtr; + + posPtr = ptr; + } + + /** + * @param size Size move on. + * @return Old position pointer or {@code 0} if move goes beyond the end of the buffer. + */ + public long move(long size) { + assert size > 0 : size; + + long oldPos = posPtr; + long newPos = oldPos + size; + + if (newPos > bufEnd) + return 0; + + posPtr = newPos; + + return oldPos; + } + + /** + * @param ptr Pointer. + * @return {@code true} If the given pointer is inside of this buffer. + */ + public boolean isInside(long ptr) { + return ptr >= bufPtr && ptr <= bufEnd; + } + + /** + * Resets position to the beginning of buffer. + */ + public void reset() { + posPtr = bufPtr; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopExecutorService.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopExecutorService.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopExecutorService.java deleted file mode 100644 index 9ec637b..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopExecutorService.java +++ /dev/null @@ -1,232 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.taskexecutor; - - -import org.apache.ignite.*; -import org.apache.ignite.internal.processors.hadoop.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.util.worker.*; -import org.apache.ignite.thread.*; -import org.jdk8.backport.*; - -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.*; - -import static java.util.Collections.*; - -/** - * Executor service without thread pooling. - */ -public class GridHadoopExecutorService { - /** */ - private final LinkedBlockingQueue<Callable<?>> queue; - - /** */ - private final Collection<GridWorker> workers = newSetFromMap(new ConcurrentHashMap8<GridWorker, Boolean>()); - - /** */ - private final AtomicInteger active = new AtomicInteger(); - - /** */ - private final int maxTasks; - - /** */ - private final String gridName; - - /** */ - private final IgniteLogger log; - - /** */ - private volatile boolean shutdown; - - /** */ - private final GridWorkerListener lsnr = new GridWorkerListenerAdapter() { - @Override public void onStopped(GridWorker w) { - workers.remove(w); - - if (shutdown) { - active.decrementAndGet(); - - return; - } - - Callable<?> task = queue.poll(); - - if (task != null) - startThread(task); - else { - active.decrementAndGet(); - - if (!queue.isEmpty()) - startFromQueue(); - } - } - }; - - /** - * @param log Logger. - * @param gridName Grid name. - * @param maxTasks Max number of tasks. - * @param maxQueue Max queue length. - */ - public GridHadoopExecutorService(IgniteLogger log, String gridName, int maxTasks, int maxQueue) { - assert maxTasks > 0 : maxTasks; - assert maxQueue > 0 : maxQueue; - - this.maxTasks = maxTasks; - this.queue = new LinkedBlockingQueue<>(maxQueue); - this.gridName = gridName; - this.log = log.getLogger(GridHadoopExecutorService.class); - } - - /** - * @return Number of active workers. - */ - public int active() { - return workers.size(); - } - - /** - * Submit task. - * - * @param task Task. - */ - public void submit(Callable<?> task) { - while (queue.isEmpty()) { - int active0 = active.get(); - - if (active0 == maxTasks) - break; - - if (active.compareAndSet(active0, active0 + 1)) { - startThread(task); - - return; // Started in new thread bypassing queue. - } - } - - try { - while (!queue.offer(task, 100, TimeUnit.MILLISECONDS)) { - if (shutdown) - return; // Rejected due to shutdown. - } - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - - return; - } - - startFromQueue(); - } - - /** - * Attempts to start task from queue. - */ - private void startFromQueue() { - do { - int active0 = active.get(); - - if (active0 == maxTasks) - break; - - if (active.compareAndSet(active0, active0 + 1)) { - Callable<?> task = queue.poll(); - - if (task == null) { - int res = active.decrementAndGet(); - - assert res >= 0 : res; - - break; - } - - startThread(task); - } - } - while (!queue.isEmpty()); - } - - /** - * @param task Task. - */ - private void startThread(final Callable<?> task) { - String workerName; - - if (task instanceof GridHadoopRunnableTask) { - final GridHadoopTaskInfo i = ((GridHadoopRunnableTask)task).taskInfo(); - - workerName = "Hadoop-task-" + i.jobId() + "-" + i.type() + "-" + i.taskNumber() + "-" + i.attempt(); - } - else - workerName = task.toString(); - - GridWorker w = new GridWorker(gridName, workerName, log, lsnr) { - @Override protected void body() { - try { - task.call(); - } - catch (Exception e) { - log.error("Failed to execute task: " + task, e); - } - } - }; - - workers.add(w); - - if (shutdown) - w.cancel(); - - new IgniteThread(w).start(); - } - - /** - * Shuts down this executor service. - * - * @param awaitTimeMillis Time in milliseconds to wait for tasks completion. - * @return {@code true} If all tasks completed. - */ - public boolean shutdown(long awaitTimeMillis) { - shutdown = true; - - for (GridWorker w : workers) - w.cancel(); - - while (awaitTimeMillis > 0 && !workers.isEmpty()) { - try { - Thread.sleep(100); - - awaitTimeMillis -= 100; - } - catch (InterruptedException e) { - break; - } - } - - return workers.isEmpty(); - } - - /** - * @return {@code true} If method {@linkplain #shutdown(long)} was already called. - */ - public boolean isShutdown() { - return shutdown; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopRunnableTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopRunnableTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopRunnableTask.java index 0d49be9..1ce7d4a 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopRunnableTask.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopRunnableTask.java @@ -56,7 +56,7 @@ public abstract class GridHadoopRunnableTask implements Callable<Void> { private long execEndTs; /** */ - private GridHadoopMultimap combinerInput; + private HadoopMultimap combinerInput; /** */ private volatile GridHadoopTaskContext ctx; @@ -103,14 +103,14 @@ public abstract class GridHadoopRunnableTask implements Callable<Void> { Throwable err = null; - GridHadoopTaskState state = GridHadoopTaskState.COMPLETED; + HadoopTaskState state = HadoopTaskState.COMPLETED; - GridHadoopPerformanceCounter perfCntr = null; + HadoopPerformanceCounter perfCntr = null; try { ctx = job.getTaskContext(info); - perfCntr = GridHadoopPerformanceCounter.getCounter(ctx.counters(), nodeId); + perfCntr = HadoopPerformanceCounter.getCounter(ctx.counters(), nodeId); perfCntr.onTaskSubmit(info, submitTs); perfCntr.onTaskPrepare(info, execStartTs); @@ -131,10 +131,10 @@ public abstract class GridHadoopRunnableTask implements Callable<Void> { } } catch (HadoopTaskCancelledException ignored) { - state = GridHadoopTaskState.CANCELED; + state = HadoopTaskState.CANCELED; } catch (Throwable e) { - state = GridHadoopTaskState.FAILED; + state = HadoopTaskState.FAILED; err = e; U.error(log, "Task execution failed.", e); @@ -145,7 +145,7 @@ public abstract class GridHadoopRunnableTask implements Callable<Void> { if (perfCntr != null) perfCntr.onTaskFinish(info, execEndTs); - onTaskFinished(new GridHadoopTaskStatus(state, err, ctx==null ? null : ctx.counters())); + onTaskFinished(new HadoopTaskStatus(state, err, ctx==null ? null : ctx.counters())); if (combinerInput != null) combinerInput.close(); @@ -161,7 +161,7 @@ public abstract class GridHadoopRunnableTask implements Callable<Void> { * @param perfCntr Performance counter. * @throws IgniteCheckedException If failed. */ - private void runTask(GridHadoopPerformanceCounter perfCntr) throws IgniteCheckedException { + private void runTask(HadoopPerformanceCounter perfCntr) throws IgniteCheckedException { if (cancelled) throw new HadoopTaskCancelledException("Task cancelled."); @@ -190,7 +190,7 @@ public abstract class GridHadoopRunnableTask implements Callable<Void> { /** * @param status Task status. */ - protected abstract void onTaskFinished(GridHadoopTaskStatus status); + protected abstract void onTaskFinished(HadoopTaskStatus status); /** * @param ctx Task context. @@ -248,8 +248,8 @@ public abstract class GridHadoopRunnableTask implements Callable<Void> { assert combinerInput == null; combinerInput = get(job.info(), SHUFFLE_COMBINER_NO_SORTING, false) ? - new GridHadoopHashMultimap(job.info(), mem, get(job.info(), COMBINER_HASHMAP_SIZE, 8 * 1024)): - new GridHadoopSkipList(job.info(), mem); // TODO replace with red-black tree + new HadoopHashMultimap(job.info(), mem, get(job.info(), COMBINER_HASHMAP_SIZE, 8 * 1024)): + new HadoopSkipList(job.info(), mem); // TODO replace with red-black tree return combinerInput.startAdding(ctx); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopTaskState.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopTaskState.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopTaskState.java deleted file mode 100644 index d1eaa66..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopTaskState.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.taskexecutor; - -/** -* State of the task. -*/ -public enum GridHadoopTaskState { - /** Running task. */ - RUNNING, - - /** Completed task. */ - COMPLETED, - - /** Failed task. */ - FAILED, - - /** Canceled task. */ - CANCELED, - - /** Process crashed. */ - CRASHED -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopTaskStatus.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopTaskStatus.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopTaskStatus.java deleted file mode 100644 index 89ef8c1..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopTaskStatus.java +++ /dev/null @@ -1,114 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.taskexecutor; - -import org.apache.ignite.internal.processors.hadoop.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.jetbrains.annotations.*; - -import java.io.*; - -/** - * Task status. - */ -public class GridHadoopTaskStatus implements Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - private GridHadoopTaskState state; - - /** */ - private Throwable failCause; - - /** */ - private GridHadoopCounters cntrs; - - /** - * Default constructor required by {@link Externalizable}. - */ - public GridHadoopTaskStatus() { - // No-op. - } - - /** - * Creates new instance. - * - * @param state Task state. - * @param failCause Failure cause (if any). - */ - public GridHadoopTaskStatus(GridHadoopTaskState state, @Nullable Throwable failCause) { - this(state, failCause, null); - } - - /** - * Creates new instance. - * - * @param state Task state. - * @param failCause Failure cause (if any). - * @param cntrs Task counters. - */ - public GridHadoopTaskStatus(GridHadoopTaskState state, @Nullable Throwable failCause, - @Nullable GridHadoopCounters cntrs) { - assert state != null; - - this.state = state; - this.failCause = failCause; - this.cntrs = cntrs; - } - - /** - * @return State. - */ - public GridHadoopTaskState state() { - return state; - } - - /** - * @return Fail cause. - */ - @Nullable public Throwable failCause() { - return failCause; - } - - /** - * @return Counters. - */ - @Nullable public GridHadoopCounters counters() { - return cntrs; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridHadoopTaskStatus.class, this); - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeObject(state); - out.writeObject(failCause); - out.writeObject(cntrs); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - state = (GridHadoopTaskState)in.readObject(); - failCause = (Throwable)in.readObject(); - cntrs = (GridHadoopCounters)in.readObject(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopEmbeddedTaskExecutor.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopEmbeddedTaskExecutor.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopEmbeddedTaskExecutor.java index f896daa..934ff35 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopEmbeddedTaskExecutor.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopEmbeddedTaskExecutor.java @@ -38,7 +38,7 @@ public class HadoopEmbeddedTaskExecutor extends HadoopTaskExecutorAdapter { private final ConcurrentMap<GridHadoopJobId, Collection<GridHadoopRunnableTask>> jobs = new ConcurrentHashMap<>(); /** Executor service to run tasks. */ - private GridHadoopExecutorService exec; + private HadoopExecutorService exec; /** {@inheritDoc} */ @Override public void onKernalStart() throws IgniteCheckedException { @@ -46,7 +46,7 @@ public class HadoopEmbeddedTaskExecutor extends HadoopTaskExecutorAdapter { jobTracker = ctx.jobTracker(); - exec = new GridHadoopExecutorService(log, ctx.kernalContext().gridName(), + exec = new HadoopExecutorService(log, ctx.kernalContext().gridName(), ctx.configuration().getMaxParallelTasks(), ctx.configuration().getMaxTaskQueueSize()); } @@ -91,7 +91,7 @@ public class HadoopEmbeddedTaskExecutor extends HadoopTaskExecutorAdapter { GridHadoopRunnableTask task = new GridHadoopRunnableTask(log, job, ctx.shuffle().memory(), info, ctx.localNodeId()) { - @Override protected void onTaskFinished(GridHadoopTaskStatus status) { + @Override protected void onTaskFinished(HadoopTaskStatus status) { if (log.isDebugEnabled()) log.debug("Finished task execution [jobId=" + job.id() + ", taskInfo=" + info + ", " + "waitTime=" + waitTime() + ", execTime=" + executionTime() + ']'); @@ -136,7 +136,7 @@ public class HadoopEmbeddedTaskExecutor extends HadoopTaskExecutorAdapter { } /** {@inheritDoc} */ - @Override public void onJobStateChanged(GridHadoopJobMetadata meta) throws IgniteCheckedException { + @Override public void onJobStateChanged(HadoopJobMetadata meta) throws IgniteCheckedException { if (meta.phase() == GridHadoopJobPhase.PHASE_COMPLETE) { Collection<GridHadoopRunnableTask> executedTasks = jobs.remove(meta.jobId()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopExecutorService.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopExecutorService.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopExecutorService.java new file mode 100644 index 0000000..19f903f --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopExecutorService.java @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.taskexecutor; + + +import org.apache.ignite.*; +import org.apache.ignite.internal.processors.hadoop.*; +import org.apache.ignite.internal.util.worker.*; +import org.apache.ignite.thread.*; +import org.jdk8.backport.*; + +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static java.util.Collections.*; + +/** + * Executor service without thread pooling. + */ +public class HadoopExecutorService { + /** */ + private final LinkedBlockingQueue<Callable<?>> queue; + + /** */ + private final Collection<GridWorker> workers = newSetFromMap(new ConcurrentHashMap8<GridWorker, Boolean>()); + + /** */ + private final AtomicInteger active = new AtomicInteger(); + + /** */ + private final int maxTasks; + + /** */ + private final String gridName; + + /** */ + private final IgniteLogger log; + + /** */ + private volatile boolean shutdown; + + /** */ + private final GridWorkerListener lsnr = new GridWorkerListenerAdapter() { + @Override public void onStopped(GridWorker w) { + workers.remove(w); + + if (shutdown) { + active.decrementAndGet(); + + return; + } + + Callable<?> task = queue.poll(); + + if (task != null) + startThread(task); + else { + active.decrementAndGet(); + + if (!queue.isEmpty()) + startFromQueue(); + } + } + }; + + /** + * @param log Logger. + * @param gridName Grid name. + * @param maxTasks Max number of tasks. + * @param maxQueue Max queue length. + */ + public HadoopExecutorService(IgniteLogger log, String gridName, int maxTasks, int maxQueue) { + assert maxTasks > 0 : maxTasks; + assert maxQueue > 0 : maxQueue; + + this.maxTasks = maxTasks; + this.queue = new LinkedBlockingQueue<>(maxQueue); + this.gridName = gridName; + this.log = log.getLogger(HadoopExecutorService.class); + } + + /** + * @return Number of active workers. + */ + public int active() { + return workers.size(); + } + + /** + * Submit task. + * + * @param task Task. + */ + public void submit(Callable<?> task) { + while (queue.isEmpty()) { + int active0 = active.get(); + + if (active0 == maxTasks) + break; + + if (active.compareAndSet(active0, active0 + 1)) { + startThread(task); + + return; // Started in new thread bypassing queue. + } + } + + try { + while (!queue.offer(task, 100, TimeUnit.MILLISECONDS)) { + if (shutdown) + return; // Rejected due to shutdown. + } + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + + return; + } + + startFromQueue(); + } + + /** + * Attempts to start task from queue. + */ + private void startFromQueue() { + do { + int active0 = active.get(); + + if (active0 == maxTasks) + break; + + if (active.compareAndSet(active0, active0 + 1)) { + Callable<?> task = queue.poll(); + + if (task == null) { + int res = active.decrementAndGet(); + + assert res >= 0 : res; + + break; + } + + startThread(task); + } + } + while (!queue.isEmpty()); + } + + /** + * @param task Task. + */ + private void startThread(final Callable<?> task) { + String workerName; + + if (task instanceof GridHadoopRunnableTask) { + final GridHadoopTaskInfo i = ((GridHadoopRunnableTask)task).taskInfo(); + + workerName = "Hadoop-task-" + i.jobId() + "-" + i.type() + "-" + i.taskNumber() + "-" + i.attempt(); + } + else + workerName = task.toString(); + + GridWorker w = new GridWorker(gridName, workerName, log, lsnr) { + @Override protected void body() { + try { + task.call(); + } + catch (Exception e) { + log.error("Failed to execute task: " + task, e); + } + } + }; + + workers.add(w); + + if (shutdown) + w.cancel(); + + new IgniteThread(w).start(); + } + + /** + * Shuts down this executor service. + * + * @param awaitTimeMillis Time in milliseconds to wait for tasks completion. + * @return {@code true} If all tasks completed. + */ + public boolean shutdown(long awaitTimeMillis) { + shutdown = true; + + for (GridWorker w : workers) + w.cancel(); + + while (awaitTimeMillis > 0 && !workers.isEmpty()) { + try { + Thread.sleep(100); + + awaitTimeMillis -= 100; + } + catch (InterruptedException e) { + break; + } + } + + return workers.isEmpty(); + } + + /** + * @return {@code true} If method {@linkplain #shutdown(long)} was already called. + */ + public boolean isShutdown() { + return shutdown; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskExecutorAdapter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskExecutorAdapter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskExecutorAdapter.java index a3d3bf7..2da2373 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskExecutorAdapter.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskExecutorAdapter.java @@ -53,5 +53,5 @@ public abstract class HadoopTaskExecutorAdapter extends HadoopComponent { * * @param meta Job metadata. */ - public abstract void onJobStateChanged(GridHadoopJobMetadata meta) throws IgniteCheckedException; + public abstract void onJobStateChanged(HadoopJobMetadata meta) throws IgniteCheckedException; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskState.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskState.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskState.java new file mode 100644 index 0000000..cf2a28e --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskState.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.taskexecutor; + +/** +* State of the task. +*/ +public enum HadoopTaskState { + /** Running task. */ + RUNNING, + + /** Completed task. */ + COMPLETED, + + /** Failed task. */ + FAILED, + + /** Canceled task. */ + CANCELED, + + /** Process crashed. */ + CRASHED +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskStatus.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskStatus.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskStatus.java new file mode 100644 index 0000000..490f0b2 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskStatus.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.taskexecutor; + +import org.apache.ignite.internal.processors.hadoop.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +import java.io.*; + +/** + * Task status. + */ +public class HadoopTaskStatus implements Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private HadoopTaskState state; + + /** */ + private Throwable failCause; + + /** */ + private GridHadoopCounters cntrs; + + /** + * Default constructor required by {@link Externalizable}. + */ + public HadoopTaskStatus() { + // No-op. + } + + /** + * Creates new instance. + * + * @param state Task state. + * @param failCause Failure cause (if any). + */ + public HadoopTaskStatus(HadoopTaskState state, @Nullable Throwable failCause) { + this(state, failCause, null); + } + + /** + * Creates new instance. + * + * @param state Task state. + * @param failCause Failure cause (if any). + * @param cntrs Task counters. + */ + public HadoopTaskStatus(HadoopTaskState state, @Nullable Throwable failCause, + @Nullable GridHadoopCounters cntrs) { + assert state != null; + + this.state = state; + this.failCause = failCause; + this.cntrs = cntrs; + } + + /** + * @return State. + */ + public HadoopTaskState state() { + return state; + } + + /** + * @return Fail cause. + */ + @Nullable public Throwable failCause() { + return failCause; + } + + /** + * @return Counters. + */ + @Nullable public GridHadoopCounters counters() { + return cntrs; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(HadoopTaskStatus.class, this); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeObject(state); + out.writeObject(failCause); + out.writeObject(cntrs); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + state = (HadoopTaskState)in.readObject(); + failCause = (Throwable)in.readObject(); + cntrs = (GridHadoopCounters)in.readObject(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/GridHadoopExternalTaskMetadata.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/GridHadoopExternalTaskMetadata.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/GridHadoopExternalTaskMetadata.java deleted file mode 100644 index 39606bc..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/GridHadoopExternalTaskMetadata.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.taskexecutor.external; - -import org.apache.ignite.internal.util.tostring.*; -import org.apache.ignite.internal.util.typedef.internal.*; - -import java.util.*; - -/** - * External task metadata (classpath, JVM options) needed to start external process execution. - */ -public class GridHadoopExternalTaskMetadata { - /** Process classpath. */ - private Collection<String> classpath; - - /** JVM options. */ - @GridToStringInclude - private Collection<String> jvmOpts; - - /** - * @return JVM Options. - */ - public Collection<String> jvmOptions() { - return jvmOpts; - } - - /** - * @param jvmOpts JVM options. - */ - public void jvmOptions(Collection<String> jvmOpts) { - this.jvmOpts = jvmOpts; - } - - /** - * @return Classpath. - */ - public Collection<String> classpath() { - return classpath; - } - - /** - * @param classpath Classpath. - */ - public void classpath(Collection<String> classpath) { - this.classpath = classpath; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridHadoopExternalTaskMetadata.class, this); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/GridHadoopJobInfoUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/GridHadoopJobInfoUpdateRequest.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/GridHadoopJobInfoUpdateRequest.java deleted file mode 100644 index 2a7c7a8..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/GridHadoopJobInfoUpdateRequest.java +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.taskexecutor.external; - -import org.apache.ignite.internal.processors.hadoop.*; -import org.apache.ignite.internal.processors.hadoop.message.*; -import org.apache.ignite.internal.util.tostring.*; -import org.apache.ignite.internal.util.typedef.internal.*; - -import java.io.*; - -/** - * Job info update request. - */ -public class GridHadoopJobInfoUpdateRequest implements GridHadoopMessage { - /** */ - private static final long serialVersionUID = 0L; - - /** Job ID. */ - @GridToStringInclude - private GridHadoopJobId jobId; - - /** Job phase. */ - @GridToStringInclude - private GridHadoopJobPhase jobPhase; - - /** Reducers addresses. */ - @GridToStringInclude - private GridHadoopProcessDescriptor[] reducersAddrs; - - /** - * Constructor required by {@link Externalizable}. - */ - public GridHadoopJobInfoUpdateRequest() { - // No-op. - } - - /** - * @param jobId Job ID. - * @param jobPhase Job phase. - * @param reducersAddrs Reducers addresses. - */ - public GridHadoopJobInfoUpdateRequest(GridHadoopJobId jobId, GridHadoopJobPhase jobPhase, - GridHadoopProcessDescriptor[] reducersAddrs) { - assert jobId != null; - - this.jobId = jobId; - this.jobPhase = jobPhase; - this.reducersAddrs = reducersAddrs; - } - - /** - * @return Job ID. - */ - public GridHadoopJobId jobId() { - return jobId; - } - - /** - * @return Job phase. - */ - public GridHadoopJobPhase jobPhase() { - return jobPhase; - } - - /** - * @return Reducers addresses. - */ - public GridHadoopProcessDescriptor[] reducersAddresses() { - return reducersAddrs; - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - jobId.writeExternal(out); - - out.writeObject(jobPhase); - U.writeArray(out, reducersAddrs); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - jobId = new GridHadoopJobId(); - jobId.readExternal(in); - - jobPhase = (GridHadoopJobPhase)in.readObject(); - reducersAddrs = (GridHadoopProcessDescriptor[])U.readArray(in); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridHadoopJobInfoUpdateRequest.class, this); - } -}