http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/GridHadoopDataInStream.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/GridHadoopDataInStream.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/GridHadoopDataInStream.java deleted file mode 100644 index 8b4f0c4..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/GridHadoopDataInStream.java +++ /dev/null @@ -1,170 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.shuffle.streams; - -import org.apache.ignite.internal.util.offheap.unsafe.*; - -import java.io.*; -import java.nio.charset.*; - -/** - * Data input stream. - */ -public class GridHadoopDataInStream extends InputStream implements DataInput { - /** */ - private final GridHadoopOffheapBuffer buf = new GridHadoopOffheapBuffer(0, 0); - - /** */ - private final GridUnsafeMemory mem; - - /** - * @param mem Memory. - */ - public GridHadoopDataInStream(GridUnsafeMemory mem) { - assert mem != null; - - this.mem = mem; - } - - /** - * @return Buffer. - */ - public GridHadoopOffheapBuffer buffer() { - return buf; - } - - /** - * @param size Size. - * @return Old pointer. - */ - protected long move(long size) throws IOException { - long ptr = buf.move(size); - - assert ptr != 0; - - return ptr; - } - - /** {@inheritDoc} */ - @Override public int read() throws IOException { - return readUnsignedByte(); - } - - /** {@inheritDoc} */ - @Override public int read(byte[] b, int off, int len) throws IOException { - readFully(b, off, len); - - return len; - } - - /** {@inheritDoc} */ - @Override public long skip(long n) throws IOException { - move(n); - - return n; - } - - /** {@inheritDoc} */ - @Override public void readFully(byte[] b) throws IOException { - readFully(b, 0, b.length); - } - - /** {@inheritDoc} */ - @Override public void readFully(byte[] b, int off, int len) throws IOException { - mem.readBytes(move(len), b, off, len); - } - - /** {@inheritDoc} */ - @Override public int skipBytes(int n) throws IOException { - move(n); - - return n; - } - - /** {@inheritDoc} */ - @Override public boolean readBoolean() throws IOException { - byte res = readByte(); - - if (res == 1) - return true; - - assert res == 0 : res; - - return false; - } - - /** {@inheritDoc} */ - @Override public byte readByte() throws IOException { - return mem.readByte(move(1)); - } - - /** {@inheritDoc} */ - @Override public int readUnsignedByte() throws IOException { - return readByte() & 0xff; - } - - /** {@inheritDoc} */ - @Override public short readShort() throws IOException { - return mem.readShort(move(2)); - } - - /** {@inheritDoc} */ - @Override public int readUnsignedShort() throws IOException { - return readShort() & 0xffff; - } - - /** {@inheritDoc} */ - @Override public char readChar() throws IOException { - return (char)readShort(); - } - - /** {@inheritDoc} */ - @Override public int readInt() throws IOException { - return mem.readInt(move(4)); - } - - /** {@inheritDoc} */ - @Override public long readLong() throws IOException { - return mem.readLong(move(8)); - } - - /** {@inheritDoc} */ - @Override public float readFloat() throws IOException { - return mem.readFloat(move(4)); - } - - /** {@inheritDoc} */ - @Override public double readDouble() throws IOException { - return mem.readDouble(move(8)); - } - - /** {@inheritDoc} */ - @Override public String readLine() throws IOException { - throw new UnsupportedOperationException(); - } - - /** {@inheritDoc} */ - @Override public String readUTF() throws IOException { - byte[] bytes = new byte[readInt()]; - - if (bytes.length != 0) - readFully(bytes); - - return new String(bytes, StandardCharsets.UTF_8); - } -}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/GridHadoopDataOutStream.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/GridHadoopDataOutStream.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/GridHadoopDataOutStream.java deleted file mode 100644 index 8b837c8..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/GridHadoopDataOutStream.java +++ /dev/null @@ -1,131 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.shuffle.streams; - -import org.apache.ignite.internal.util.offheap.unsafe.*; - -import java.io.*; -import java.nio.charset.*; - -import static org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory.*; - -/** - * Data output stream. - */ -public class GridHadoopDataOutStream extends OutputStream implements DataOutput { - /** */ - private final GridHadoopOffheapBuffer buf = new GridHadoopOffheapBuffer(0, 0); - - /** */ - private final GridUnsafeMemory mem; - - /** - * @param mem Memory. - */ - public GridHadoopDataOutStream(GridUnsafeMemory mem) { - this.mem = mem; - } - - /** - * @return Buffer. - */ - public GridHadoopOffheapBuffer buffer() { - return buf; - } - - /** - * @param size Size. - * @return Old pointer or {@code 0} if move was impossible. - */ - public long move(long size) { - return buf.move(size); - } - - /** {@inheritDoc} */ - @Override public void write(int b) { - writeByte(b); - } - - /** {@inheritDoc} */ - @Override public void write(byte[] b) { - write(b, 0, b.length); - } - - /** {@inheritDoc} */ - @Override public void write(byte[] b, int off, int len) { - UNSAFE.copyMemory(b, BYTE_ARR_OFF + off, null, move(len), len); - } - - /** {@inheritDoc} */ - @Override public void writeBoolean(boolean v) { - writeByte(v ? 1 : 0); - } - - /** {@inheritDoc} */ - @Override public void writeByte(int v) { - mem.writeByte(move(1), (byte)v); - } - - /** {@inheritDoc} */ - @Override public void writeShort(int v) { - mem.writeShort(move(2), (short)v); - } - - /** {@inheritDoc} */ - @Override public void writeChar(int v) { - writeShort(v); - } - - /** {@inheritDoc} */ - @Override public void writeInt(int v) { - mem.writeInt(move(4), v); - } - - /** {@inheritDoc} */ - @Override public void writeLong(long v) { - mem.writeLong(move(8), v); - } - - /** {@inheritDoc} */ - @Override public void writeFloat(float v) { - mem.writeFloat(move(4), v); - } - - /** {@inheritDoc} */ - @Override public void writeDouble(double v) { - mem.writeDouble(move(8), v); - } - - /** {@inheritDoc} */ - @Override public void writeBytes(String s) { - writeUTF(s); - } - - /** {@inheritDoc} */ - @Override public void writeChars(String s) { - writeUTF(s); - } - - /** {@inheritDoc} */ - @Override public void writeUTF(String s) { - byte[] b = s.getBytes(StandardCharsets.UTF_8); - - writeInt(b.length); - write(b); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/GridHadoopOffheapBuffer.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/GridHadoopOffheapBuffer.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/GridHadoopOffheapBuffer.java deleted file mode 100644 index f9f0e1d..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/GridHadoopOffheapBuffer.java +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.shuffle.streams; - -/** - * Offheap buffer. - */ -public class GridHadoopOffheapBuffer { - /** Buffer begin address. */ - private long bufPtr; - - /** The first address we do not own. */ - private long bufEnd; - - /** Current read or write pointer. */ - private long posPtr; - - /** - * @param bufPtr Pointer to buffer begin. - * @param bufSize Size of the buffer. - */ - public GridHadoopOffheapBuffer(long bufPtr, long bufSize) { - set(bufPtr, bufSize); - } - - /** - * @param bufPtr Pointer to buffer begin. - * @param bufSize Size of the buffer. - */ - public void set(long bufPtr, long bufSize) { - this.bufPtr = bufPtr; - - posPtr = bufPtr; - bufEnd = bufPtr + bufSize; - } - - /** - * @return Pointer to internal buffer begin. - */ - public long begin() { - return bufPtr; - } - - /** - * @return Buffer capacity. - */ - public long capacity() { - return bufEnd - bufPtr; - } - - /** - * @return Remaining capacity. - */ - public long remaining() { - return bufEnd - posPtr; - } - - /** - * @return Absolute pointer to the current position inside of the buffer. - */ - public long pointer() { - return posPtr; - } - - /** - * @param ptr Absolute pointer to the current position inside of the buffer. - */ - public void pointer(long ptr) { - assert ptr >= bufPtr : bufPtr + " <= " + ptr; - assert ptr <= bufEnd : bufEnd + " <= " + bufPtr; - - posPtr = ptr; - } - - /** - * @param size Size move on. - * @return Old position pointer or {@code 0} if move goes beyond the end of the buffer. - */ - public long move(long size) { - assert size > 0 : size; - - long oldPos = posPtr; - long newPos = oldPos + size; - - if (newPos > bufEnd) - return 0; - - posPtr = newPos; - - return oldPos; - } - - /** - * @param ptr Pointer. - * @return {@code true} If the given pointer is inside of this buffer. - */ - public boolean isInside(long ptr) { - return ptr >= bufPtr && ptr <= bufEnd; - } - - /** - * Resets position to the beginning of buffer. - */ - public void reset() { - posPtr = bufPtr; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopDataInStream.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopDataInStream.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopDataInStream.java new file mode 100644 index 0000000..8a1ee70 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopDataInStream.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.shuffle.streams; + +import org.apache.ignite.internal.util.offheap.unsafe.*; + +import java.io.*; +import java.nio.charset.*; + +/** + * Data input stream. + */ +public class HadoopDataInStream extends InputStream implements DataInput { + /** */ + private final HadoopOffheapBuffer buf = new HadoopOffheapBuffer(0, 0); + + /** */ + private final GridUnsafeMemory mem; + + /** + * @param mem Memory. + */ + public HadoopDataInStream(GridUnsafeMemory mem) { + assert mem != null; + + this.mem = mem; + } + + /** + * @return Buffer. + */ + public HadoopOffheapBuffer buffer() { + return buf; + } + + /** + * @param size Size. + * @return Old pointer. + */ + protected long move(long size) throws IOException { + long ptr = buf.move(size); + + assert ptr != 0; + + return ptr; + } + + /** {@inheritDoc} */ + @Override public int read() throws IOException { + return readUnsignedByte(); + } + + /** {@inheritDoc} */ + @Override public int read(byte[] b, int off, int len) throws IOException { + readFully(b, off, len); + + return len; + } + + /** {@inheritDoc} */ + @Override public long skip(long n) throws IOException { + move(n); + + return n; + } + + /** {@inheritDoc} */ + @Override public void readFully(byte[] b) throws IOException { + readFully(b, 0, b.length); + } + + /** {@inheritDoc} */ + @Override public void readFully(byte[] b, int off, int len) throws IOException { + mem.readBytes(move(len), b, off, len); + } + + /** {@inheritDoc} */ + @Override public int skipBytes(int n) throws IOException { + move(n); + + return n; + } + + /** {@inheritDoc} */ + @Override public boolean readBoolean() throws IOException { + byte res = readByte(); + + if (res == 1) + return true; + + assert res == 0 : res; + + return false; + } + + /** {@inheritDoc} */ + @Override public byte readByte() throws IOException { + return mem.readByte(move(1)); + } + + /** {@inheritDoc} */ + @Override public int readUnsignedByte() throws IOException { + return readByte() & 0xff; + } + + /** {@inheritDoc} */ + @Override public short readShort() throws IOException { + return mem.readShort(move(2)); + } + + /** {@inheritDoc} */ + @Override public int readUnsignedShort() throws IOException { + return readShort() & 0xffff; + } + + /** {@inheritDoc} */ + @Override public char readChar() throws IOException { + return (char)readShort(); + } + + /** {@inheritDoc} */ + @Override public int readInt() throws IOException { + return mem.readInt(move(4)); + } + + /** {@inheritDoc} */ + @Override public long readLong() throws IOException { + return mem.readLong(move(8)); + } + + /** {@inheritDoc} */ + @Override public float readFloat() throws IOException { + return mem.readFloat(move(4)); + } + + /** {@inheritDoc} */ + @Override public double readDouble() throws IOException { + return mem.readDouble(move(8)); + } + + /** {@inheritDoc} */ + @Override public String readLine() throws IOException { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public String readUTF() throws IOException { + byte[] bytes = new byte[readInt()]; + + if (bytes.length != 0) + readFully(bytes); + + return new String(bytes, StandardCharsets.UTF_8); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopDataOutStream.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopDataOutStream.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopDataOutStream.java new file mode 100644 index 0000000..51bddf9 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopDataOutStream.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.shuffle.streams; + +import org.apache.ignite.internal.util.offheap.unsafe.*; + +import java.io.*; +import java.nio.charset.*; + +import static org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory.*; + +/** + * Data output stream. + */ +public class HadoopDataOutStream extends OutputStream implements DataOutput { + /** */ + private final HadoopOffheapBuffer buf = new HadoopOffheapBuffer(0, 0); + + /** */ + private final GridUnsafeMemory mem; + + /** + * @param mem Memory. + */ + public HadoopDataOutStream(GridUnsafeMemory mem) { + this.mem = mem; + } + + /** + * @return Buffer. + */ + public HadoopOffheapBuffer buffer() { + return buf; + } + + /** + * @param size Size. + * @return Old pointer or {@code 0} if move was impossible. + */ + public long move(long size) { + return buf.move(size); + } + + /** {@inheritDoc} */ + @Override public void write(int b) { + writeByte(b); + } + + /** {@inheritDoc} */ + @Override public void write(byte[] b) { + write(b, 0, b.length); + } + + /** {@inheritDoc} */ + @Override public void write(byte[] b, int off, int len) { + UNSAFE.copyMemory(b, BYTE_ARR_OFF + off, null, move(len), len); + } + + /** {@inheritDoc} */ + @Override public void writeBoolean(boolean v) { + writeByte(v ? 1 : 0); + } + + /** {@inheritDoc} */ + @Override public void writeByte(int v) { + mem.writeByte(move(1), (byte)v); + } + + /** {@inheritDoc} */ + @Override public void writeShort(int v) { + mem.writeShort(move(2), (short)v); + } + + /** {@inheritDoc} */ + @Override public void writeChar(int v) { + writeShort(v); + } + + /** {@inheritDoc} */ + @Override public void writeInt(int v) { + mem.writeInt(move(4), v); + } + + /** {@inheritDoc} */ + @Override public void writeLong(long v) { + mem.writeLong(move(8), v); + } + + /** {@inheritDoc} */ + @Override public void writeFloat(float v) { + mem.writeFloat(move(4), v); + } + + /** {@inheritDoc} */ + @Override public void writeDouble(double v) { + mem.writeDouble(move(8), v); + } + + /** {@inheritDoc} */ + @Override public void writeBytes(String s) { + writeUTF(s); + } + + /** {@inheritDoc} */ + @Override public void writeChars(String s) { + writeUTF(s); + } + + /** {@inheritDoc} */ + @Override public void writeUTF(String s) { + byte[] b = s.getBytes(StandardCharsets.UTF_8); + + writeInt(b.length); + write(b); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopOffheapBuffer.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopOffheapBuffer.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopOffheapBuffer.java new file mode 100644 index 0000000..a8e7a33 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopOffheapBuffer.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.shuffle.streams; + +/** + * Offheap buffer. + */ +public class HadoopOffheapBuffer { + /** Buffer begin address. */ + private long bufPtr; + + /** The first address we do not own. */ + private long bufEnd; + + /** Current read or write pointer. */ + private long posPtr; + + /** + * @param bufPtr Pointer to buffer begin. + * @param bufSize Size of the buffer. + */ + public HadoopOffheapBuffer(long bufPtr, long bufSize) { + set(bufPtr, bufSize); + } + + /** + * @param bufPtr Pointer to buffer begin. + * @param bufSize Size of the buffer. + */ + public void set(long bufPtr, long bufSize) { + this.bufPtr = bufPtr; + + posPtr = bufPtr; + bufEnd = bufPtr + bufSize; + } + + /** + * @return Pointer to internal buffer begin. + */ + public long begin() { + return bufPtr; + } + + /** + * @return Buffer capacity. + */ + public long capacity() { + return bufEnd - bufPtr; + } + + /** + * @return Remaining capacity. + */ + public long remaining() { + return bufEnd - posPtr; + } + + /** + * @return Absolute pointer to the current position inside of the buffer. + */ + public long pointer() { + return posPtr; + } + + /** + * @param ptr Absolute pointer to the current position inside of the buffer. + */ + public void pointer(long ptr) { + assert ptr >= bufPtr : bufPtr + " <= " + ptr; + assert ptr <= bufEnd : bufEnd + " <= " + bufPtr; + + posPtr = ptr; + } + + /** + * @param size Size move on. + * @return Old position pointer or {@code 0} if move goes beyond the end of the buffer. + */ + public long move(long size) { + assert size > 0 : size; + + long oldPos = posPtr; + long newPos = oldPos + size; + + if (newPos > bufEnd) + return 0; + + posPtr = newPos; + + return oldPos; + } + + /** + * @param ptr Pointer. + * @return {@code true} If the given pointer is inside of this buffer. + */ + public boolean isInside(long ptr) { + return ptr >= bufPtr && ptr <= bufEnd; + } + + /** + * Resets position to the beginning of buffer. + */ + public void reset() { + posPtr = bufPtr; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopEmbeddedTaskExecutor.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopEmbeddedTaskExecutor.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopEmbeddedTaskExecutor.java deleted file mode 100644 index fde5400..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopEmbeddedTaskExecutor.java +++ /dev/null @@ -1,146 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.taskexecutor; - -import org.apache.ignite.*; -import org.apache.ignite.internal.processors.hadoop.*; -import org.apache.ignite.internal.processors.hadoop.jobtracker.*; -import org.apache.ignite.internal.util.*; -import org.apache.ignite.internal.util.typedef.internal.*; - -import java.util.*; -import java.util.concurrent.*; - - -/** - * Task executor. - */ -public class GridHadoopEmbeddedTaskExecutor extends GridHadoopTaskExecutorAdapter { - /** Job tracker. */ - private GridHadoopJobTracker jobTracker; - - /** */ - private final ConcurrentMap<GridHadoopJobId, Collection<GridHadoopRunnableTask>> jobs = new ConcurrentHashMap<>(); - - /** Executor service to run tasks. */ - private GridHadoopExecutorService exec; - - /** {@inheritDoc} */ - @Override public void onKernalStart() throws IgniteCheckedException { - super.onKernalStart(); - - jobTracker = ctx.jobTracker(); - - exec = new GridHadoopExecutorService(log, ctx.kernalContext().gridName(), - ctx.configuration().getMaxParallelTasks(), ctx.configuration().getMaxTaskQueueSize()); - } - - /** {@inheritDoc} */ - @Override public void onKernalStop(boolean cancel) { - if (exec != null) { - exec.shutdown(3000); - - if (cancel) { - for (GridHadoopJobId jobId : jobs.keySet()) - cancelTasks(jobId); - } - } - } - - /** {@inheritDoc} */ - @Override public void stop(boolean cancel) { - if (exec != null && !exec.shutdown(30000)) - U.warn(log, "Failed to finish running tasks in 30 sec."); - } - - /** {@inheritDoc} */ - @Override public void run(final GridHadoopJob job, Collection<GridHadoopTaskInfo> tasks) throws IgniteCheckedException { - if (log.isDebugEnabled()) - log.debug("Submitting tasks for local execution [locNodeId=" + ctx.localNodeId() + - ", tasksCnt=" + tasks.size() + ']'); - - Collection<GridHadoopRunnableTask> executedTasks = jobs.get(job.id()); - - if (executedTasks == null) { - executedTasks = new GridConcurrentHashSet<>(); - - Collection<GridHadoopRunnableTask> extractedCol = jobs.put(job.id(), executedTasks); - - assert extractedCol == null; - } - - final Collection<GridHadoopRunnableTask> finalExecutedTasks = executedTasks; - - for (final GridHadoopTaskInfo info : tasks) { - assert info != null; - - GridHadoopRunnableTask task = new GridHadoopRunnableTask(log, job, ctx.shuffle().memory(), info, - ctx.localNodeId()) { - @Override protected void onTaskFinished(GridHadoopTaskStatus status) { - if (log.isDebugEnabled()) - log.debug("Finished task execution [jobId=" + job.id() + ", taskInfo=" + info + ", " + - "waitTime=" + waitTime() + ", execTime=" + executionTime() + ']'); - - finalExecutedTasks.remove(this); - - jobTracker.onTaskFinished(info, status); - } - - @Override protected GridHadoopTaskInput createInput(GridHadoopTaskContext taskCtx) throws IgniteCheckedException { - return ctx.shuffle().input(taskCtx); - } - - @Override protected GridHadoopTaskOutput createOutput(GridHadoopTaskContext taskCtx) throws IgniteCheckedException { - return ctx.shuffle().output(taskCtx); - } - }; - - executedTasks.add(task); - - exec.submit(task); - } - } - - /** - * Cancels all currently running tasks for given job ID and cancels scheduled execution of tasks - * for this job ID. - * <p> - * It is guaranteed that this method will not be called concurrently with - * {@link #run(GridHadoopJob, Collection)} method. No more job submissions will be performed via - * {@link #run(GridHadoopJob, Collection)} method for given job ID after this method is called. - * - * @param jobId Job ID to cancel. - */ - @Override public void cancelTasks(GridHadoopJobId jobId) { - Collection<GridHadoopRunnableTask> executedTasks = jobs.get(jobId); - - if (executedTasks != null) { - for (GridHadoopRunnableTask task : executedTasks) - task.cancel(); - } - } - - /** {@inheritDoc} */ - @Override public void onJobStateChanged(GridHadoopJobMetadata meta) throws IgniteCheckedException { - if (meta.phase() == GridHadoopJobPhase.PHASE_COMPLETE) { - Collection<GridHadoopRunnableTask> executedTasks = jobs.remove(meta.jobId()); - - assert executedTasks == null || executedTasks.isEmpty(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopExecutorService.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopExecutorService.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopExecutorService.java deleted file mode 100644 index 9ec637b..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopExecutorService.java +++ /dev/null @@ -1,232 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.taskexecutor; - - -import org.apache.ignite.*; -import org.apache.ignite.internal.processors.hadoop.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.util.worker.*; -import org.apache.ignite.thread.*; -import org.jdk8.backport.*; - -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.*; - -import static java.util.Collections.*; - -/** - * Executor service without thread pooling. - */ -public class GridHadoopExecutorService { - /** */ - private final LinkedBlockingQueue<Callable<?>> queue; - - /** */ - private final Collection<GridWorker> workers = newSetFromMap(new ConcurrentHashMap8<GridWorker, Boolean>()); - - /** */ - private final AtomicInteger active = new AtomicInteger(); - - /** */ - private final int maxTasks; - - /** */ - private final String gridName; - - /** */ - private final IgniteLogger log; - - /** */ - private volatile boolean shutdown; - - /** */ - private final GridWorkerListener lsnr = new GridWorkerListenerAdapter() { - @Override public void onStopped(GridWorker w) { - workers.remove(w); - - if (shutdown) { - active.decrementAndGet(); - - return; - } - - Callable<?> task = queue.poll(); - - if (task != null) - startThread(task); - else { - active.decrementAndGet(); - - if (!queue.isEmpty()) - startFromQueue(); - } - } - }; - - /** - * @param log Logger. - * @param gridName Grid name. - * @param maxTasks Max number of tasks. - * @param maxQueue Max queue length. - */ - public GridHadoopExecutorService(IgniteLogger log, String gridName, int maxTasks, int maxQueue) { - assert maxTasks > 0 : maxTasks; - assert maxQueue > 0 : maxQueue; - - this.maxTasks = maxTasks; - this.queue = new LinkedBlockingQueue<>(maxQueue); - this.gridName = gridName; - this.log = log.getLogger(GridHadoopExecutorService.class); - } - - /** - * @return Number of active workers. - */ - public int active() { - return workers.size(); - } - - /** - * Submit task. - * - * @param task Task. - */ - public void submit(Callable<?> task) { - while (queue.isEmpty()) { - int active0 = active.get(); - - if (active0 == maxTasks) - break; - - if (active.compareAndSet(active0, active0 + 1)) { - startThread(task); - - return; // Started in new thread bypassing queue. - } - } - - try { - while (!queue.offer(task, 100, TimeUnit.MILLISECONDS)) { - if (shutdown) - return; // Rejected due to shutdown. - } - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - - return; - } - - startFromQueue(); - } - - /** - * Attempts to start task from queue. - */ - private void startFromQueue() { - do { - int active0 = active.get(); - - if (active0 == maxTasks) - break; - - if (active.compareAndSet(active0, active0 + 1)) { - Callable<?> task = queue.poll(); - - if (task == null) { - int res = active.decrementAndGet(); - - assert res >= 0 : res; - - break; - } - - startThread(task); - } - } - while (!queue.isEmpty()); - } - - /** - * @param task Task. - */ - private void startThread(final Callable<?> task) { - String workerName; - - if (task instanceof GridHadoopRunnableTask) { - final GridHadoopTaskInfo i = ((GridHadoopRunnableTask)task).taskInfo(); - - workerName = "Hadoop-task-" + i.jobId() + "-" + i.type() + "-" + i.taskNumber() + "-" + i.attempt(); - } - else - workerName = task.toString(); - - GridWorker w = new GridWorker(gridName, workerName, log, lsnr) { - @Override protected void body() { - try { - task.call(); - } - catch (Exception e) { - log.error("Failed to execute task: " + task, e); - } - } - }; - - workers.add(w); - - if (shutdown) - w.cancel(); - - new IgniteThread(w).start(); - } - - /** - * Shuts down this executor service. - * - * @param awaitTimeMillis Time in milliseconds to wait for tasks completion. - * @return {@code true} If all tasks completed. - */ - public boolean shutdown(long awaitTimeMillis) { - shutdown = true; - - for (GridWorker w : workers) - w.cancel(); - - while (awaitTimeMillis > 0 && !workers.isEmpty()) { - try { - Thread.sleep(100); - - awaitTimeMillis -= 100; - } - catch (InterruptedException e) { - break; - } - } - - return workers.isEmpty(); - } - - /** - * @return {@code true} If method {@linkplain #shutdown(long)} was already called. - */ - public boolean isShutdown() { - return shutdown; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopRunnableTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopRunnableTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopRunnableTask.java deleted file mode 100644 index fd4a030..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopRunnableTask.java +++ /dev/null @@ -1,268 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.taskexecutor; - -import org.apache.ignite.*; -import org.apache.ignite.internal.processors.hadoop.*; -import org.apache.ignite.internal.processors.hadoop.counter.*; -import org.apache.ignite.internal.processors.hadoop.shuffle.collections.*; -import org.apache.ignite.internal.util.offheap.unsafe.*; -import org.apache.ignite.internal.util.typedef.internal.*; - -import java.util.*; -import java.util.concurrent.*; - -import static org.apache.ignite.internal.processors.hadoop.GridHadoopJobProperty.*; -import static org.apache.ignite.internal.processors.hadoop.GridHadoopTaskType.*; - -/** - * Runnable task. - */ -public abstract class GridHadoopRunnableTask implements Callable<Void> { - /** */ - private final GridUnsafeMemory mem; - - /** */ - private final IgniteLogger log; - - /** */ - private final GridHadoopJob job; - - /** Task to run. */ - private final GridHadoopTaskInfo info; - - /** Submit time. */ - private final long submitTs = U.currentTimeMillis(); - - /** Execution start timestamp. */ - private long execStartTs; - - /** Execution end timestamp. */ - private long execEndTs; - - /** */ - private GridHadoopMultimap combinerInput; - - /** */ - private volatile GridHadoopTaskContext ctx; - - /** Set if task is to cancelling. */ - private volatile boolean cancelled; - - /** Node id. */ - private UUID nodeId; - - /** - * @param log Log. - * @param job Job. - * @param mem Memory. - * @param info Task info. - * @param nodeId Node id. - */ - protected GridHadoopRunnableTask(IgniteLogger log, GridHadoopJob job, GridUnsafeMemory mem, GridHadoopTaskInfo info, - UUID nodeId) { - this.nodeId = nodeId; - this.log = log.getLogger(GridHadoopRunnableTask.class); - this.job = job; - this.mem = mem; - this.info = info; - } - - /** - * @return Wait time. - */ - public long waitTime() { - return execStartTs - submitTs; - } - - /** - * @return Execution time. - */ - public long executionTime() { - return execEndTs - execStartTs; - } - - /** {@inheritDoc} */ - @Override public Void call() throws IgniteCheckedException { - execStartTs = U.currentTimeMillis(); - - Throwable err = null; - - GridHadoopTaskState state = GridHadoopTaskState.COMPLETED; - - GridHadoopPerformanceCounter perfCntr = null; - - try { - ctx = job.getTaskContext(info); - - perfCntr = GridHadoopPerformanceCounter.getCounter(ctx.counters(), nodeId); - - perfCntr.onTaskSubmit(info, submitTs); - perfCntr.onTaskPrepare(info, execStartTs); - - ctx.prepareTaskEnvironment(); - - runTask(perfCntr); - - if (info.type() == MAP && job.info().hasCombiner()) { - ctx.taskInfo(new GridHadoopTaskInfo(COMBINE, info.jobId(), info.taskNumber(), info.attempt(), null)); - - try { - runTask(perfCntr); - } - finally { - ctx.taskInfo(info); - } - } - } - catch (GridHadoopTaskCancelledException ignored) { - state = GridHadoopTaskState.CANCELED; - } - catch (Throwable e) { - state = GridHadoopTaskState.FAILED; - err = e; - - U.error(log, "Task execution failed.", e); - } - finally { - execEndTs = U.currentTimeMillis(); - - if (perfCntr != null) - perfCntr.onTaskFinish(info, execEndTs); - - onTaskFinished(new GridHadoopTaskStatus(state, err, ctx==null ? null : ctx.counters())); - - if (combinerInput != null) - combinerInput.close(); - - if (ctx != null) - ctx.cleanupTaskEnvironment(); - } - - return null; - } - - /** - * @param perfCntr Performance counter. - * @throws IgniteCheckedException If failed. - */ - private void runTask(GridHadoopPerformanceCounter perfCntr) throws IgniteCheckedException { - if (cancelled) - throw new GridHadoopTaskCancelledException("Task cancelled."); - - try (GridHadoopTaskOutput out = createOutputInternal(ctx); - GridHadoopTaskInput in = createInputInternal(ctx)) { - - ctx.input(in); - ctx.output(out); - - perfCntr.onTaskStart(ctx.taskInfo(), U.currentTimeMillis()); - - ctx.run(); - } - } - - /** - * Cancel the executed task. - */ - public void cancel() { - cancelled = true; - - if (ctx != null) - ctx.cancel(); - } - - /** - * @param status Task status. - */ - protected abstract void onTaskFinished(GridHadoopTaskStatus status); - - /** - * @param ctx Task context. - * @return Task input. - * @throws IgniteCheckedException If failed. - */ - @SuppressWarnings("unchecked") - private GridHadoopTaskInput createInputInternal(GridHadoopTaskContext ctx) throws IgniteCheckedException { - switch (ctx.taskInfo().type()) { - case SETUP: - case MAP: - case COMMIT: - case ABORT: - return null; - - case COMBINE: - assert combinerInput != null; - - return combinerInput.input(ctx); - - default: - return createInput(ctx); - } - } - - /** - * @param ctx Task context. - * @return Input. - * @throws IgniteCheckedException If failed. - */ - protected abstract GridHadoopTaskInput createInput(GridHadoopTaskContext ctx) throws IgniteCheckedException; - - /** - * @param ctx Task info. - * @return Output. - * @throws IgniteCheckedException If failed. - */ - protected abstract GridHadoopTaskOutput createOutput(GridHadoopTaskContext ctx) throws IgniteCheckedException; - - /** - * @param ctx Task info. - * @return Task output. - * @throws IgniteCheckedException If failed. - */ - private GridHadoopTaskOutput createOutputInternal(GridHadoopTaskContext ctx) throws IgniteCheckedException { - switch (ctx.taskInfo().type()) { - case SETUP: - case REDUCE: - case COMMIT: - case ABORT: - return null; - - case MAP: - if (job.info().hasCombiner()) { - assert combinerInput == null; - - combinerInput = get(job.info(), SHUFFLE_COMBINER_NO_SORTING, false) ? - new GridHadoopHashMultimap(job.info(), mem, get(job.info(), COMBINER_HASHMAP_SIZE, 8 * 1024)): - new GridHadoopSkipList(job.info(), mem); // TODO replace with red-black tree - - return combinerInput.startAdding(ctx); - } - - default: - return createOutput(ctx); - } - } - - /** - * @return Task info. - */ - public GridHadoopTaskInfo taskInfo() { - return info; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopTaskExecutorAdapter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopTaskExecutorAdapter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopTaskExecutorAdapter.java deleted file mode 100644 index 8f66190..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopTaskExecutorAdapter.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.taskexecutor; - -import org.apache.ignite.*; -import org.apache.ignite.internal.processors.hadoop.*; -import org.apache.ignite.internal.processors.hadoop.jobtracker.*; - -import java.util.*; - -/** - * Common superclass for task executor. - */ -public abstract class GridHadoopTaskExecutorAdapter extends GridHadoopComponent { - /** - * Runs tasks. - * - * @param job Job. - * @param tasks Tasks. - * @throws IgniteCheckedException If failed. - */ - public abstract void run(final GridHadoopJob job, Collection<GridHadoopTaskInfo> tasks) throws IgniteCheckedException; - - /** - * Cancels all currently running tasks for given job ID and cancels scheduled execution of tasks - * for this job ID. - * <p> - * It is guaranteed that this method will not be called concurrently with - * {@link #run(GridHadoopJob, Collection)} method. No more job submissions will be performed via - * {@link #run(GridHadoopJob, Collection)} method for given job ID after this method is called. - * - * @param jobId Job ID to cancel. - */ - public abstract void cancelTasks(GridHadoopJobId jobId) throws IgniteCheckedException; - - /** - * On job state change callback; - * - * @param meta Job metadata. - */ - public abstract void onJobStateChanged(GridHadoopJobMetadata meta) throws IgniteCheckedException; -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopTaskState.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopTaskState.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopTaskState.java deleted file mode 100644 index d1eaa66..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopTaskState.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.taskexecutor; - -/** -* State of the task. -*/ -public enum GridHadoopTaskState { - /** Running task. */ - RUNNING, - - /** Completed task. */ - COMPLETED, - - /** Failed task. */ - FAILED, - - /** Canceled task. */ - CANCELED, - - /** Process crashed. */ - CRASHED -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopTaskStatus.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopTaskStatus.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopTaskStatus.java deleted file mode 100644 index 89ef8c1..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopTaskStatus.java +++ /dev/null @@ -1,114 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.taskexecutor; - -import org.apache.ignite.internal.processors.hadoop.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.jetbrains.annotations.*; - -import java.io.*; - -/** - * Task status. - */ -public class GridHadoopTaskStatus implements Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - private GridHadoopTaskState state; - - /** */ - private Throwable failCause; - - /** */ - private GridHadoopCounters cntrs; - - /** - * Default constructor required by {@link Externalizable}. - */ - public GridHadoopTaskStatus() { - // No-op. - } - - /** - * Creates new instance. - * - * @param state Task state. - * @param failCause Failure cause (if any). - */ - public GridHadoopTaskStatus(GridHadoopTaskState state, @Nullable Throwable failCause) { - this(state, failCause, null); - } - - /** - * Creates new instance. - * - * @param state Task state. - * @param failCause Failure cause (if any). - * @param cntrs Task counters. - */ - public GridHadoopTaskStatus(GridHadoopTaskState state, @Nullable Throwable failCause, - @Nullable GridHadoopCounters cntrs) { - assert state != null; - - this.state = state; - this.failCause = failCause; - this.cntrs = cntrs; - } - - /** - * @return State. - */ - public GridHadoopTaskState state() { - return state; - } - - /** - * @return Fail cause. - */ - @Nullable public Throwable failCause() { - return failCause; - } - - /** - * @return Counters. - */ - @Nullable public GridHadoopCounters counters() { - return cntrs; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridHadoopTaskStatus.class, this); - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeObject(state); - out.writeObject(failCause); - out.writeObject(cntrs); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - state = (GridHadoopTaskState)in.readObject(); - failCause = (Throwable)in.readObject(); - cntrs = (GridHadoopCounters)in.readObject(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopEmbeddedTaskExecutor.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopEmbeddedTaskExecutor.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopEmbeddedTaskExecutor.java new file mode 100644 index 0000000..a3c20d8 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopEmbeddedTaskExecutor.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.taskexecutor; + +import org.apache.ignite.*; +import org.apache.ignite.internal.processors.hadoop.*; +import org.apache.ignite.internal.processors.hadoop.jobtracker.*; +import org.apache.ignite.internal.util.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.util.*; +import java.util.concurrent.*; + + +/** + * Task executor. + */ +public class HadoopEmbeddedTaskExecutor extends HadoopTaskExecutorAdapter { + /** Job tracker. */ + private HadoopJobTracker jobTracker; + + /** */ + private final ConcurrentMap<HadoopJobId, Collection<HadoopRunnableTask>> jobs = new ConcurrentHashMap<>(); + + /** Executor service to run tasks. */ + private HadoopExecutorService exec; + + /** {@inheritDoc} */ + @Override public void onKernalStart() throws IgniteCheckedException { + super.onKernalStart(); + + jobTracker = ctx.jobTracker(); + + exec = new HadoopExecutorService(log, ctx.kernalContext().gridName(), + ctx.configuration().getMaxParallelTasks(), ctx.configuration().getMaxTaskQueueSize()); + } + + /** {@inheritDoc} */ + @Override public void onKernalStop(boolean cancel) { + if (exec != null) { + exec.shutdown(3000); + + if (cancel) { + for (HadoopJobId jobId : jobs.keySet()) + cancelTasks(jobId); + } + } + } + + /** {@inheritDoc} */ + @Override public void stop(boolean cancel) { + if (exec != null && !exec.shutdown(30000)) + U.warn(log, "Failed to finish running tasks in 30 sec."); + } + + /** {@inheritDoc} */ + @Override public void run(final HadoopJob job, Collection<HadoopTaskInfo> tasks) throws IgniteCheckedException { + if (log.isDebugEnabled()) + log.debug("Submitting tasks for local execution [locNodeId=" + ctx.localNodeId() + + ", tasksCnt=" + tasks.size() + ']'); + + Collection<HadoopRunnableTask> executedTasks = jobs.get(job.id()); + + if (executedTasks == null) { + executedTasks = new GridConcurrentHashSet<>(); + + Collection<HadoopRunnableTask> extractedCol = jobs.put(job.id(), executedTasks); + + assert extractedCol == null; + } + + final Collection<HadoopRunnableTask> finalExecutedTasks = executedTasks; + + for (final HadoopTaskInfo info : tasks) { + assert info != null; + + HadoopRunnableTask task = new HadoopRunnableTask(log, job, ctx.shuffle().memory(), info, + ctx.localNodeId()) { + @Override protected void onTaskFinished(HadoopTaskStatus status) { + if (log.isDebugEnabled()) + log.debug("Finished task execution [jobId=" + job.id() + ", taskInfo=" + info + ", " + + "waitTime=" + waitTime() + ", execTime=" + executionTime() + ']'); + + finalExecutedTasks.remove(this); + + jobTracker.onTaskFinished(info, status); + } + + @Override protected HadoopTaskInput createInput(HadoopTaskContext taskCtx) throws IgniteCheckedException { + return ctx.shuffle().input(taskCtx); + } + + @Override protected HadoopTaskOutput createOutput(HadoopTaskContext taskCtx) throws IgniteCheckedException { + return ctx.shuffle().output(taskCtx); + } + }; + + executedTasks.add(task); + + exec.submit(task); + } + } + + /** + * Cancels all currently running tasks for given job ID and cancels scheduled execution of tasks + * for this job ID. + * <p> + * It is guaranteed that this method will not be called concurrently with + * {@link #run(org.apache.ignite.internal.processors.hadoop.HadoopJob, Collection)} method. No more job submissions will be performed via + * {@link #run(org.apache.ignite.internal.processors.hadoop.HadoopJob, Collection)} method for given job ID after this method is called. + * + * @param jobId Job ID to cancel. + */ + @Override public void cancelTasks(HadoopJobId jobId) { + Collection<HadoopRunnableTask> executedTasks = jobs.get(jobId); + + if (executedTasks != null) { + for (HadoopRunnableTask task : executedTasks) + task.cancel(); + } + } + + /** {@inheritDoc} */ + @Override public void onJobStateChanged(HadoopJobMetadata meta) throws IgniteCheckedException { + if (meta.phase() == HadoopJobPhase.PHASE_COMPLETE) { + Collection<HadoopRunnableTask> executedTasks = jobs.remove(meta.jobId()); + + assert executedTasks == null || executedTasks.isEmpty(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopExecutorService.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopExecutorService.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopExecutorService.java new file mode 100644 index 0000000..1c318e9 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopExecutorService.java @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.taskexecutor; + + +import org.apache.ignite.*; +import org.apache.ignite.internal.processors.hadoop.*; +import org.apache.ignite.internal.util.worker.*; +import org.apache.ignite.thread.*; +import org.jdk8.backport.*; + +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static java.util.Collections.*; + +/** + * Executor service without thread pooling. + */ +public class HadoopExecutorService { + /** */ + private final LinkedBlockingQueue<Callable<?>> queue; + + /** */ + private final Collection<GridWorker> workers = newSetFromMap(new ConcurrentHashMap8<GridWorker, Boolean>()); + + /** */ + private final AtomicInteger active = new AtomicInteger(); + + /** */ + private final int maxTasks; + + /** */ + private final String gridName; + + /** */ + private final IgniteLogger log; + + /** */ + private volatile boolean shutdown; + + /** */ + private final GridWorkerListener lsnr = new GridWorkerListenerAdapter() { + @Override public void onStopped(GridWorker w) { + workers.remove(w); + + if (shutdown) { + active.decrementAndGet(); + + return; + } + + Callable<?> task = queue.poll(); + + if (task != null) + startThread(task); + else { + active.decrementAndGet(); + + if (!queue.isEmpty()) + startFromQueue(); + } + } + }; + + /** + * @param log Logger. + * @param gridName Grid name. + * @param maxTasks Max number of tasks. + * @param maxQueue Max queue length. + */ + public HadoopExecutorService(IgniteLogger log, String gridName, int maxTasks, int maxQueue) { + assert maxTasks > 0 : maxTasks; + assert maxQueue > 0 : maxQueue; + + this.maxTasks = maxTasks; + this.queue = new LinkedBlockingQueue<>(maxQueue); + this.gridName = gridName; + this.log = log.getLogger(HadoopExecutorService.class); + } + + /** + * @return Number of active workers. + */ + public int active() { + return workers.size(); + } + + /** + * Submit task. + * + * @param task Task. + */ + public void submit(Callable<?> task) { + while (queue.isEmpty()) { + int active0 = active.get(); + + if (active0 == maxTasks) + break; + + if (active.compareAndSet(active0, active0 + 1)) { + startThread(task); + + return; // Started in new thread bypassing queue. + } + } + + try { + while (!queue.offer(task, 100, TimeUnit.MILLISECONDS)) { + if (shutdown) + return; // Rejected due to shutdown. + } + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + + return; + } + + startFromQueue(); + } + + /** + * Attempts to start task from queue. + */ + private void startFromQueue() { + do { + int active0 = active.get(); + + if (active0 == maxTasks) + break; + + if (active.compareAndSet(active0, active0 + 1)) { + Callable<?> task = queue.poll(); + + if (task == null) { + int res = active.decrementAndGet(); + + assert res >= 0 : res; + + break; + } + + startThread(task); + } + } + while (!queue.isEmpty()); + } + + /** + * @param task Task. + */ + private void startThread(final Callable<?> task) { + String workerName; + + if (task instanceof HadoopRunnableTask) { + final HadoopTaskInfo i = ((HadoopRunnableTask)task).taskInfo(); + + workerName = "Hadoop-task-" + i.jobId() + "-" + i.type() + "-" + i.taskNumber() + "-" + i.attempt(); + } + else + workerName = task.toString(); + + GridWorker w = new GridWorker(gridName, workerName, log, lsnr) { + @Override protected void body() { + try { + task.call(); + } + catch (Exception e) { + log.error("Failed to execute task: " + task, e); + } + } + }; + + workers.add(w); + + if (shutdown) + w.cancel(); + + new IgniteThread(w).start(); + } + + /** + * Shuts down this executor service. + * + * @param awaitTimeMillis Time in milliseconds to wait for tasks completion. + * @return {@code true} If all tasks completed. + */ + public boolean shutdown(long awaitTimeMillis) { + shutdown = true; + + for (GridWorker w : workers) + w.cancel(); + + while (awaitTimeMillis > 0 && !workers.isEmpty()) { + try { + Thread.sleep(100); + + awaitTimeMillis -= 100; + } + catch (InterruptedException e) { + break; + } + } + + return workers.isEmpty(); + } + + /** + * @return {@code true} If method {@linkplain #shutdown(long)} was already called. + */ + public boolean isShutdown() { + return shutdown; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java new file mode 100644 index 0000000..2b36267 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java @@ -0,0 +1,268 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.taskexecutor; + +import org.apache.ignite.*; +import org.apache.ignite.internal.processors.hadoop.*; +import org.apache.ignite.internal.processors.hadoop.counter.*; +import org.apache.ignite.internal.processors.hadoop.shuffle.collections.*; +import org.apache.ignite.internal.util.offheap.unsafe.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.util.*; +import java.util.concurrent.*; + +import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.*; +import static org.apache.ignite.internal.processors.hadoop.HadoopTaskType.*; + +/** + * Runnable task. + */ +public abstract class HadoopRunnableTask implements Callable<Void> { + /** */ + private final GridUnsafeMemory mem; + + /** */ + private final IgniteLogger log; + + /** */ + private final HadoopJob job; + + /** Task to run. */ + private final HadoopTaskInfo info; + + /** Submit time. */ + private final long submitTs = U.currentTimeMillis(); + + /** Execution start timestamp. */ + private long execStartTs; + + /** Execution end timestamp. */ + private long execEndTs; + + /** */ + private HadoopMultimap combinerInput; + + /** */ + private volatile HadoopTaskContext ctx; + + /** Set if task is to cancelling. */ + private volatile boolean cancelled; + + /** Node id. */ + private UUID nodeId; + + /** + * @param log Log. + * @param job Job. + * @param mem Memory. + * @param info Task info. + * @param nodeId Node id. + */ + protected HadoopRunnableTask(IgniteLogger log, HadoopJob job, GridUnsafeMemory mem, HadoopTaskInfo info, + UUID nodeId) { + this.nodeId = nodeId; + this.log = log.getLogger(HadoopRunnableTask.class); + this.job = job; + this.mem = mem; + this.info = info; + } + + /** + * @return Wait time. + */ + public long waitTime() { + return execStartTs - submitTs; + } + + /** + * @return Execution time. + */ + public long executionTime() { + return execEndTs - execStartTs; + } + + /** {@inheritDoc} */ + @Override public Void call() throws IgniteCheckedException { + execStartTs = U.currentTimeMillis(); + + Throwable err = null; + + HadoopTaskState state = HadoopTaskState.COMPLETED; + + HadoopPerformanceCounter perfCntr = null; + + try { + ctx = job.getTaskContext(info); + + perfCntr = HadoopPerformanceCounter.getCounter(ctx.counters(), nodeId); + + perfCntr.onTaskSubmit(info, submitTs); + perfCntr.onTaskPrepare(info, execStartTs); + + ctx.prepareTaskEnvironment(); + + runTask(perfCntr); + + if (info.type() == MAP && job.info().hasCombiner()) { + ctx.taskInfo(new HadoopTaskInfo(COMBINE, info.jobId(), info.taskNumber(), info.attempt(), null)); + + try { + runTask(perfCntr); + } + finally { + ctx.taskInfo(info); + } + } + } + catch (HadoopTaskCancelledException ignored) { + state = HadoopTaskState.CANCELED; + } + catch (Throwable e) { + state = HadoopTaskState.FAILED; + err = e; + + U.error(log, "Task execution failed.", e); + } + finally { + execEndTs = U.currentTimeMillis(); + + if (perfCntr != null) + perfCntr.onTaskFinish(info, execEndTs); + + onTaskFinished(new HadoopTaskStatus(state, err, ctx==null ? null : ctx.counters())); + + if (combinerInput != null) + combinerInput.close(); + + if (ctx != null) + ctx.cleanupTaskEnvironment(); + } + + return null; + } + + /** + * @param perfCntr Performance counter. + * @throws IgniteCheckedException If failed. + */ + private void runTask(HadoopPerformanceCounter perfCntr) throws IgniteCheckedException { + if (cancelled) + throw new HadoopTaskCancelledException("Task cancelled."); + + try (HadoopTaskOutput out = createOutputInternal(ctx); + HadoopTaskInput in = createInputInternal(ctx)) { + + ctx.input(in); + ctx.output(out); + + perfCntr.onTaskStart(ctx.taskInfo(), U.currentTimeMillis()); + + ctx.run(); + } + } + + /** + * Cancel the executed task. + */ + public void cancel() { + cancelled = true; + + if (ctx != null) + ctx.cancel(); + } + + /** + * @param status Task status. + */ + protected abstract void onTaskFinished(HadoopTaskStatus status); + + /** + * @param ctx Task context. + * @return Task input. + * @throws IgniteCheckedException If failed. + */ + @SuppressWarnings("unchecked") + private HadoopTaskInput createInputInternal(HadoopTaskContext ctx) throws IgniteCheckedException { + switch (ctx.taskInfo().type()) { + case SETUP: + case MAP: + case COMMIT: + case ABORT: + return null; + + case COMBINE: + assert combinerInput != null; + + return combinerInput.input(ctx); + + default: + return createInput(ctx); + } + } + + /** + * @param ctx Task context. + * @return Input. + * @throws IgniteCheckedException If failed. + */ + protected abstract HadoopTaskInput createInput(HadoopTaskContext ctx) throws IgniteCheckedException; + + /** + * @param ctx Task info. + * @return Output. + * @throws IgniteCheckedException If failed. + */ + protected abstract HadoopTaskOutput createOutput(HadoopTaskContext ctx) throws IgniteCheckedException; + + /** + * @param ctx Task info. + * @return Task output. + * @throws IgniteCheckedException If failed. + */ + private HadoopTaskOutput createOutputInternal(HadoopTaskContext ctx) throws IgniteCheckedException { + switch (ctx.taskInfo().type()) { + case SETUP: + case REDUCE: + case COMMIT: + case ABORT: + return null; + + case MAP: + if (job.info().hasCombiner()) { + assert combinerInput == null; + + combinerInput = get(job.info(), SHUFFLE_COMBINER_NO_SORTING, false) ? + new HadoopHashMultimap(job.info(), mem, get(job.info(), COMBINER_HASHMAP_SIZE, 8 * 1024)): + new HadoopSkipList(job.info(), mem); // TODO replace with red-black tree + + return combinerInput.startAdding(ctx); + } + + default: + return createOutput(ctx); + } + } + + /** + * @return Task info. + */ + public HadoopTaskInfo taskInfo() { + return info; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskExecutorAdapter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskExecutorAdapter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskExecutorAdapter.java new file mode 100644 index 0000000..39b4935 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskExecutorAdapter.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.taskexecutor; + +import org.apache.ignite.*; +import org.apache.ignite.internal.processors.hadoop.*; +import org.apache.ignite.internal.processors.hadoop.jobtracker.*; + +import java.util.*; + +/** + * Common superclass for task executor. + */ +public abstract class HadoopTaskExecutorAdapter extends HadoopComponent { + /** + * Runs tasks. + * + * @param job Job. + * @param tasks Tasks. + * @throws IgniteCheckedException If failed. + */ + public abstract void run(final HadoopJob job, Collection<HadoopTaskInfo> tasks) throws IgniteCheckedException; + + /** + * Cancels all currently running tasks for given job ID and cancels scheduled execution of tasks + * for this job ID. + * <p> + * It is guaranteed that this method will not be called concurrently with + * {@link #run(org.apache.ignite.internal.processors.hadoop.HadoopJob, Collection)} method. No more job submissions will be performed via + * {@link #run(org.apache.ignite.internal.processors.hadoop.HadoopJob, Collection)} method for given job ID after this method is called. + * + * @param jobId Job ID to cancel. + */ + public abstract void cancelTasks(HadoopJobId jobId) throws IgniteCheckedException; + + /** + * On job state change callback; + * + * @param meta Job metadata. + */ + public abstract void onJobStateChanged(HadoopJobMetadata meta) throws IgniteCheckedException; +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskState.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskState.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskState.java new file mode 100644 index 0000000..cf2a28e --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskState.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.taskexecutor; + +/** +* State of the task. +*/ +public enum HadoopTaskState { + /** Running task. */ + RUNNING, + + /** Completed task. */ + COMPLETED, + + /** Failed task. */ + FAILED, + + /** Canceled task. */ + CANCELED, + + /** Process crashed. */ + CRASHED +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskStatus.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskStatus.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskStatus.java new file mode 100644 index 0000000..c5ee16c --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskStatus.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.taskexecutor; + +import org.apache.ignite.internal.processors.hadoop.counter.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +import java.io.*; + +/** + * Task status. + */ +public class HadoopTaskStatus implements Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private HadoopTaskState state; + + /** */ + private Throwable failCause; + + /** */ + private HadoopCounters cntrs; + + /** + * Default constructor required by {@link Externalizable}. + */ + public HadoopTaskStatus() { + // No-op. + } + + /** + * Creates new instance. + * + * @param state Task state. + * @param failCause Failure cause (if any). + */ + public HadoopTaskStatus(HadoopTaskState state, @Nullable Throwable failCause) { + this(state, failCause, null); + } + + /** + * Creates new instance. + * + * @param state Task state. + * @param failCause Failure cause (if any). + * @param cntrs Task counters. + */ + public HadoopTaskStatus(HadoopTaskState state, @Nullable Throwable failCause, + @Nullable HadoopCounters cntrs) { + assert state != null; + + this.state = state; + this.failCause = failCause; + this.cntrs = cntrs; + } + + /** + * @return State. + */ + public HadoopTaskState state() { + return state; + } + + /** + * @return Fail cause. + */ + @Nullable public Throwable failCause() { + return failCause; + } + + /** + * @return Counters. + */ + @Nullable public HadoopCounters counters() { + return cntrs; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(HadoopTaskStatus.class, this); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeObject(state); + out.writeObject(failCause); + out.writeObject(cntrs); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + state = (HadoopTaskState)in.readObject(); + failCause = (Throwable)in.readObject(); + cntrs = (HadoopCounters)in.readObject(); + } +}