# IGNITE-386: WIP on internal namings (5).
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/e1ffc10f Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/e1ffc10f Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/e1ffc10f Branch: refs/heads/ignite-386 Commit: e1ffc10f6332acb3d8b8b29f6602cfdc0d45be3c Parents: 17c8d0d Author: vozerov-gridgain <voze...@gridgain.com> Authored: Tue Mar 3 16:08:03 2015 +0300 Committer: vozerov-gridgain <voze...@gridgain.com> Committed: Tue Mar 3 16:08:03 2015 +0300 ---------------------------------------------------------------------- .../hadoop/fs/v1/IgniteHadoopFileSystem.java | 6 +- .../hadoop/fs/v2/IgniteHadoopFileSystem.java | 4 +- .../igfs/hadoop/HadoopIgfsInputStream.java | 626 +++++++++++++++++++ .../internal/igfs/hadoop/HadoopIgfsIpcIo.java | 599 ++++++++++++++++++ .../igfs/hadoop/HadoopIgfsIpcIoListener.java | 2 +- .../internal/igfs/hadoop/HadoopIgfsOutProc.java | 466 ++++++++++++++ .../igfs/hadoop/HadoopIgfsOutputStream.java | 201 ++++++ .../internal/igfs/hadoop/HadoopIgfsWrapper.java | 6 +- .../igfs/hadoop/HadoopInputIgfsStream.java | 626 ------------------- .../internal/igfs/hadoop/HadoopIpcIgfsIo.java | 599 ------------------ .../internal/igfs/hadoop/HadoopOutProcIgfs.java | 466 -------------- .../igfs/hadoop/HadoopOutputIgfsStream.java | 201 ------ .../ignite/internal/igfs/hadoop/package.html | 24 - .../apache/ignite/internal/igfs/package.html | 24 - 14 files changed, 1901 insertions(+), 1949 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1ffc10f/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java index 9c95437..f7b5dda 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java @@ -528,7 +528,7 @@ public class IgniteHadoopFileSystem extends FileSystem { LOG.debug("Opening input stream [thread=" + Thread.currentThread().getName() + ", path=" + path + ", bufSize=" + bufSize + ']'); - HadoopInputIgfsStream igfsIn = new HadoopInputIgfsStream(stream, stream.length(), + HadoopIgfsInputStream igfsIn = new HadoopIgfsInputStream(stream, stream.length(), bufSize, LOG, clientLog, logId); if (LOG.isDebugEnabled()) @@ -599,7 +599,7 @@ public class IgniteHadoopFileSystem extends FileSystem { if (LOG.isDebugEnabled()) LOG.debug("Opened output stream in create [path=" + path + ", delegate=" + stream + ']'); - HadoopOutputIgfsStream igfsOut = new HadoopOutputIgfsStream(stream, LOG, clientLog, + HadoopIgfsOutputStream igfsOut = new HadoopIgfsOutputStream(stream, LOG, clientLog, logId); bufSize = Math.max(64 * 1024, bufSize); @@ -673,7 +673,7 @@ public class IgniteHadoopFileSystem extends FileSystem { if (LOG.isDebugEnabled()) LOG.debug("Opened output stream in append [path=" + path + ", delegate=" + stream + ']'); - HadoopOutputIgfsStream igfsOut = new HadoopOutputIgfsStream(stream, LOG, clientLog, + HadoopIgfsOutputStream igfsOut = new HadoopIgfsOutputStream(stream, LOG, clientLog, logId); bufSize = Math.max(64 * 1024, bufSize); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1ffc10f/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java index 1c9165c..016a068 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java @@ -467,7 +467,7 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea LOG.debug("Opening input stream [thread=" + Thread.currentThread().getName() + ", path=" + path + ", bufSize=" + bufSize + ']'); - HadoopInputIgfsStream igfsIn = new HadoopInputIgfsStream(stream, stream.length(), + HadoopIgfsInputStream igfsIn = new HadoopIgfsInputStream(stream, stream.length(), bufSize, LOG, clientLog, logId); if (LOG.isDebugEnabled()) @@ -566,7 +566,7 @@ public class IgniteHadoopFileSystem extends AbstractFileSystem implements Closea assert stream != null; - HadoopOutputIgfsStream igfsOut = new HadoopOutputIgfsStream(stream, LOG, + HadoopIgfsOutputStream igfsOut = new HadoopIgfsOutputStream(stream, LOG, clientLog, logId); bufSize = Math.max(64 * 1024, bufSize); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1ffc10f/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsInputStream.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsInputStream.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsInputStream.java new file mode 100644 index 0000000..6293e2f --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsInputStream.java @@ -0,0 +1,626 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.igfs.hadoop; + +import org.apache.commons.logging.*; +import org.apache.hadoop.fs.*; +import org.apache.ignite.*; +import org.apache.ignite.internal.igfs.common.*; +import org.apache.ignite.internal.util.lang.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +import java.io.*; + +/** + * IGFS input stream wrapper for hadoop interfaces. + */ +@SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") +public final class HadoopIgfsInputStream extends InputStream implements Seekable, PositionedReadable, + HadoopIgfsStreamEventListener { + /** Minimum buffer size. */ + private static final int MIN_BUF_SIZE = 4 * 1024; + + /** Server stream delegate. */ + private HadoopIgfsStreamDelegate delegate; + + /** Stream ID used by logger. */ + private long logStreamId; + + /** Stream position. */ + private long pos; + + /** Stream read limit. */ + private long limit; + + /** Mark position. */ + private long markPos = -1; + + /** Prefetch buffer. */ + private DoubleFetchBuffer buf = new DoubleFetchBuffer(); + + /** Buffer half size for double-buffering. */ + private int bufHalfSize; + + /** Closed flag. */ + private volatile boolean closed; + + /** Flag set if stream was closed due to connection breakage. */ + private boolean connBroken; + + /** Logger. */ + private Log log; + + /** Client logger. */ + private IgfsLogger clientLog; + + /** Read time. */ + private long readTime; + + /** User time. */ + private long userTime; + + /** Last timestamp. */ + private long lastTs; + + /** Amount of read bytes. */ + private long total; + + /** + * Creates input stream. + * + * @param delegate Server stream delegate. + * @param limit Read limit. + * @param bufSize Buffer size. + * @param log Log. + * @param clientLog Client logger. + */ + public HadoopIgfsInputStream(HadoopIgfsStreamDelegate delegate, long limit, int bufSize, Log log, + IgfsLogger clientLog, long logStreamId) { + assert limit >= 0; + + this.delegate = delegate; + this.limit = limit; + this.log = log; + this.clientLog = clientLog; + this.logStreamId = logStreamId; + + bufHalfSize = Math.max(bufSize, MIN_BUF_SIZE); + + lastTs = System.nanoTime(); + + delegate.hadoop().addEventListener(delegate, this); + } + + /** + * Read start. + */ + private void readStart() { + long now = System.nanoTime(); + + userTime += now - lastTs; + + lastTs = now; + } + + /** + * Read end. + */ + private void readEnd() { + long now = System.nanoTime(); + + readTime += now - lastTs; + + lastTs = now; + } + + /** {@inheritDoc} */ + @Override public synchronized int read() throws IOException { + checkClosed(); + + readStart(); + + try { + if (eof()) + return -1; + + buf.refreshAhead(pos); + + int res = buf.atPosition(pos); + + pos++; + total++; + + buf.refreshAhead(pos); + + return res; + } + catch (IgniteCheckedException e) { + throw HadoopIgfsUtils.cast(e); + } + finally { + readEnd(); + } + } + + /** {@inheritDoc} */ + @Override public synchronized int read(@NotNull byte[] b, int off, int len) throws IOException { + checkClosed(); + + if (eof()) + return -1; + + readStart(); + + try { + long remaining = limit - pos; + + int read = buf.flatten(b, pos, off, len); + + pos += read; + total += read; + remaining -= read; + + if (remaining > 0 && read != len) { + int readAmt = (int)Math.min(remaining, len - read); + + delegate.hadoop().readData(delegate, pos, readAmt, b, off + read, len - read).get(); + + read += readAmt; + pos += readAmt; + total += readAmt; + } + + buf.refreshAhead(pos); + + return read; + } + catch (IgniteCheckedException e) { + throw HadoopIgfsUtils.cast(e); + } + finally { + readEnd(); + } + } + + /** {@inheritDoc} */ + @Override public synchronized long skip(long n) throws IOException { + checkClosed(); + + if (clientLog.isLogEnabled()) + clientLog.logSkip(logStreamId, n); + + long oldPos = pos; + + if (pos + n <= limit) + pos += n; + else + pos = limit; + + buf.refreshAhead(pos); + + return pos - oldPos; + } + + /** {@inheritDoc} */ + @Override public synchronized int available() throws IOException { + checkClosed(); + + int available = buf.available(pos); + + assert available >= 0; + + return available; + } + + /** {@inheritDoc} */ + @Override public synchronized void close() throws IOException { + if (!closed) { + readStart(); + + if (log.isDebugEnabled()) + log.debug("Closing input stream: " + delegate); + + delegate.hadoop().closeStream(delegate); + + readEnd(); + + if (clientLog.isLogEnabled()) + clientLog.logCloseIn(logStreamId, userTime, readTime, total); + + markClosed(false); + + if (log.isDebugEnabled()) + log.debug("Closed stream [delegate=" + delegate + ", readTime=" + readTime + + ", userTime=" + userTime + ']'); + } + } + + /** {@inheritDoc} */ + @Override public synchronized void mark(int readLimit) { + markPos = pos; + + if (clientLog.isLogEnabled()) + clientLog.logMark(logStreamId, readLimit); + } + + /** {@inheritDoc} */ + @Override public synchronized void reset() throws IOException { + checkClosed(); + + if (clientLog.isLogEnabled()) + clientLog.logReset(logStreamId); + + if (markPos == -1) + throw new IOException("Stream was not marked."); + + pos = markPos; + + buf.refreshAhead(pos); + } + + /** {@inheritDoc} */ + @Override public boolean markSupported() { + return true; + } + + /** {@inheritDoc} */ + @Override public synchronized int read(long position, byte[] buf, int off, int len) throws IOException { + long remaining = limit - position; + + int read = (int)Math.min(len, remaining); + + // Return -1 at EOF. + if (read == 0) + return -1; + + readFully(position, buf, off, read); + + return read; + } + + /** {@inheritDoc} */ + @Override public synchronized void readFully(long position, byte[] buf, int off, int len) throws IOException { + long remaining = limit - position; + + checkClosed(); + + if (len > remaining) + throw new EOFException("End of stream reached before data was fully read."); + + readStart(); + + try { + int read = this.buf.flatten(buf, position, off, len); + + total += read; + + if (read != len) { + int readAmt = len - read; + + delegate.hadoop().readData(delegate, position + read, readAmt, buf, off + read, readAmt).get(); + + total += readAmt; + } + + if (clientLog.isLogEnabled()) + clientLog.logRandomRead(logStreamId, position, len); + } + catch (IgniteCheckedException e) { + throw HadoopIgfsUtils.cast(e); + } + finally { + readEnd(); + } + } + + /** {@inheritDoc} */ + @Override public void readFully(long position, byte[] buf) throws IOException { + readFully(position, buf, 0, buf.length); + } + + /** {@inheritDoc} */ + @Override public synchronized void seek(long pos) throws IOException { + A.ensure(pos >= 0, "position must be non-negative"); + + checkClosed(); + + if (clientLog.isLogEnabled()) + clientLog.logSeek(logStreamId, pos); + + if (pos > limit) + pos = limit; + + if (log.isDebugEnabled()) + log.debug("Seek to position [delegate=" + delegate + ", pos=" + pos + ", oldPos=" + this.pos + ']'); + + this.pos = pos; + + buf.refreshAhead(pos); + } + + /** {@inheritDoc} */ + @Override public synchronized long getPos() { + return pos; + } + + /** {@inheritDoc} */ + @Override public synchronized boolean seekToNewSource(long targetPos) { + return false; + } + + /** {@inheritDoc} */ + @Override public void onClose() { + markClosed(true); + } + + /** {@inheritDoc} */ + @Override public void onError(String errMsg) { + // No-op. + } + + /** + * Marks stream as closed. + * + * @param connBroken {@code True} if connection with server was lost. + */ + private void markClosed(boolean connBroken) { + // It is ok to have race here. + if (!closed) { + closed = true; + + this.connBroken = connBroken; + + delegate.hadoop().removeEventListener(delegate); + } + } + + /** + * @throws IOException If check failed. + */ + private void checkClosed() throws IOException { + if (closed) { + if (connBroken) + throw new IOException("Server connection was lost."); + else + throw new IOException("Stream is closed."); + } + } + + /** + * @return {@code True} if end of stream reached. + */ + private boolean eof() { + return limit == pos; + } + + /** + * Asynchronous prefetch buffer. + */ + private static class FetchBufferPart { + /** Read future. */ + private GridPlainFuture<byte[]> readFut; + + /** Position of cached chunk in file. */ + private long pos; + + /** Prefetch length. Need to store as read future result might be not available yet. */ + private int len; + + /** + * Creates fetch buffer part. + * + * @param readFut Read future for this buffer. + * @param pos Read position. + * @param len Chunk length. + */ + private FetchBufferPart(GridPlainFuture<byte[]> readFut, long pos, int len) { + this.readFut = readFut; + this.pos = pos; + this.len = len; + } + + /** + * Copies cached data if specified position matches cached region. + * + * @param dst Destination buffer. + * @param pos Read position in file. + * @param dstOff Offset in destination buffer from which start writing. + * @param len Maximum number of bytes to copy. + * @return Number of bytes copied. + * @throws IgniteCheckedException If read future failed. + */ + public int flatten(byte[] dst, long pos, int dstOff, int len) throws IgniteCheckedException { + // If read start position is within cached boundaries. + if (contains(pos)) { + byte[] data = readFut.get(); + + int srcPos = (int)(pos - this.pos); + int cpLen = Math.min(len, data.length - srcPos); + + U.arrayCopy(data, srcPos, dst, dstOff, cpLen); + + return cpLen; + } + + return 0; + } + + /** + * @return {@code True} if data is ready to be read. + */ + public boolean ready() { + return readFut.isDone(); + } + + /** + * Checks if current buffer part contains given position. + * + * @param pos Position to check. + * @return {@code True} if position matches buffer region. + */ + public boolean contains(long pos) { + return this.pos <= pos && this.pos + len > pos; + } + } + + private class DoubleFetchBuffer { + /** */ + private FetchBufferPart first; + + /** */ + private FetchBufferPart second; + + /** + * Copies fetched data from both buffers to destination array if cached region matched read position. + * + * @param dst Destination buffer. + * @param pos Read position in file. + * @param dstOff Destination buffer offset. + * @param len Maximum number of bytes to copy. + * @return Number of bytes copied. + * @throws IgniteCheckedException If any read operation failed. + */ + public int flatten(byte[] dst, long pos, int dstOff, int len) throws IgniteCheckedException { + assert dstOff >= 0; + assert dstOff + len <= dst.length : "Invalid indices [dst.length=" + dst.length + ", dstOff=" + dstOff + + ", len=" + len + ']'; + + int bytesCopied = 0; + + if (first != null) { + bytesCopied += first.flatten(dst, pos, dstOff, len); + + if (bytesCopied != len && second != null) { + assert second.pos == first.pos + first.len; + + bytesCopied += second.flatten(dst, pos + bytesCopied, dstOff + bytesCopied, len - bytesCopied); + } + } + + return bytesCopied; + } + + /** + * Gets byte at specified position in buffer. + * + * @param pos Stream position. + * @return Read byte. + * @throws IgniteCheckedException If read failed. + */ + public int atPosition(long pos) throws IgniteCheckedException { + // Should not reach here if stream contains no data. + assert first != null; + + if (first.contains(pos)) { + byte[] bytes = first.readFut.get(); + + return bytes[((int)(pos - first.pos))] & 0xFF; + } + else { + assert second != null; + assert second.contains(pos); + + byte[] bytes = second.readFut.get(); + + return bytes[((int)(pos - second.pos))] & 0xFF; + } + } + + /** + * Starts asynchronous buffer refresh if needed, depending on current position. + * + * @param pos Current stream position. + */ + public void refreshAhead(long pos) { + if (fullPrefetch(pos)) { + first = fetch(pos, bufHalfSize); + second = fetch(pos + bufHalfSize, bufHalfSize); + } + else if (needFlip(pos)) { + first = second; + + second = fetch(first.pos + first.len, bufHalfSize); + } + } + + /** + * @param pos Position from which read is expected. + * @return Number of bytes available to be read without blocking. + */ + public int available(long pos) { + int available = 0; + + if (first != null) { + if (first.contains(pos)) { + if (first.ready()) { + available += (pos - first.pos); + + if (second != null && second.ready()) + available += second.len; + } + } + else { + if (second != null && second.contains(pos) && second.ready()) + available += (pos - second.pos); + } + } + + return available; + } + + /** + * Checks if position shifted enough to forget previous buffer. + * + * @param pos Current position. + * @return {@code True} if need flip buffers. + */ + private boolean needFlip(long pos) { + // Return true if we read more then half of second buffer. + return second != null && second.contains(pos); + } + + /** + * Determines if all cached bytes should be discarded and new region should be + * prefetched. + * + * @param curPos Current stream position. + * @return {@code True} if need to refresh both blocks. + */ + private boolean fullPrefetch(long curPos) { + // If no data was prefetched yet, return true. + return first == null || curPos < first.pos || (second != null && curPos >= second.pos + second.len); + } + + /** + * Starts asynchronous fetch for given region. + * + * @param pos Position to read from. + * @param size Number of bytes to read. + * @return Fetch buffer part. + */ + private FetchBufferPart fetch(long pos, int size) { + long remaining = limit - pos; + + size = (int)Math.min(size, remaining); + + return size <= 0 ? null : + new FetchBufferPart(delegate.hadoop().readData(delegate, pos, size, null, 0, 0), pos, size); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1ffc10f/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsIpcIo.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsIpcIo.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsIpcIo.java new file mode 100644 index 0000000..47e5763 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsIpcIo.java @@ -0,0 +1,599 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.igfs.hadoop; + +import org.apache.commons.logging.*; +import org.apache.ignite.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.igfs.common.*; +import org.apache.ignite.internal.util.*; +import org.apache.ignite.internal.util.ipc.*; +import org.apache.ignite.internal.util.ipc.shmem.*; +import org.apache.ignite.internal.util.lang.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.jdk8.backport.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; +import java.util.concurrent.locks.*; + +/** + * IO layer implementation based on blocking IPC streams. + */ +@SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") +public class HadoopIgfsIpcIo implements HadoopIgfsIo { + /** Logger. */ + private Log log; + + /** Request futures map. */ + private ConcurrentMap<Long, HadoopIgfsFuture> reqMap = + new ConcurrentHashMap8<>(); + + /** Request ID counter. */ + private AtomicLong reqIdCnt = new AtomicLong(); + + /** Endpoint. */ + private IpcEndpoint endpoint; + + /** Endpoint output stream. */ + private IgfsDataOutputStream out; + + /** Protocol. */ + private final IgfsMarshaller marsh; + + /** Client reader thread. */ + private Thread reader; + + /** Lock for graceful shutdown. */ + private final ReadWriteLock busyLock = new ReentrantReadWriteLock(); + + /** Stopping flag. */ + private volatile boolean stopping; + + /** Server endpoint address. */ + private final String endpointAddr; + + /** Number of open file system sessions. */ + private final AtomicInteger activeCnt = new AtomicInteger(1); + + /** Event listeners. */ + private final Collection<HadoopIgfsIpcIoListener> lsnrs = + new GridConcurrentHashSet<>(); + + /** Cached connections. */ + private static final ConcurrentMap<String, HadoopIgfsIpcIo> ipcCache = + new ConcurrentHashMap8<>(); + + /** Striped lock that prevents multiple instance creation in {@link #get(Log, String)}. */ + private static final GridStripedLock initLock = new GridStripedLock(32); + + /** + * @param endpointAddr Endpoint. + * @param marsh Protocol. + * @param log Logger to use. + */ + public HadoopIgfsIpcIo(String endpointAddr, IgfsMarshaller marsh, Log log) { + assert endpointAddr != null; + assert marsh != null; + + this.endpointAddr = endpointAddr; + this.marsh = marsh; + this.log = log; + } + + /** + * Returns a started and valid instance of this class + * for a given endpoint. + * + * @param log Logger to use for new instance. + * @param endpoint Endpoint string. + * @return New or existing cached instance, which is started and operational. + * @throws IOException If new instance was created but failed to start. + */ + public static HadoopIgfsIpcIo get(Log log, String endpoint) throws IOException { + while (true) { + HadoopIgfsIpcIo clientIo = ipcCache.get(endpoint); + + if (clientIo != null) { + if (clientIo.acquire()) + return clientIo; + else + // If concurrent close. + ipcCache.remove(endpoint, clientIo); + } + else { + Lock lock = initLock.getLock(endpoint); + + lock.lock(); + + try { + clientIo = ipcCache.get(endpoint); + + if (clientIo != null) { // Perform double check. + if (clientIo.acquire()) + return clientIo; + else + // If concurrent close. + ipcCache.remove(endpoint, clientIo); + } + + // Otherwise try creating a new one. + clientIo = new HadoopIgfsIpcIo(endpoint, new IgfsMarshaller(), log); + + try { + clientIo.start(); + } + catch (IgniteCheckedException e) { + throw new IOException(e.getMessage(), e); + } + + HadoopIgfsIpcIo old = ipcCache.putIfAbsent(endpoint, clientIo); + + // Put in exclusive lock. + assert old == null; + + return clientIo; + } + finally { + lock.unlock(); + } + } + } + } + + /** + * Increases usage count for this instance. + * + * @return {@code true} if usage count is greater than zero. + */ + private boolean acquire() { + while (true) { + int cnt = activeCnt.get(); + + if (cnt == 0) { + if (log.isDebugEnabled()) + log.debug("IPC IO not acquired (count was 0): " + this); + + return false; + } + + // Need to make sure that no-one decremented count in between. + if (activeCnt.compareAndSet(cnt, cnt + 1)) { + if (log.isDebugEnabled()) + log.debug("IPC IO acquired: " + this); + + return true; + } + } + } + + /** + * Releases this instance, decrementing usage count. + * <p> + * If usage count becomes zero, the instance is stopped + * and removed from cache. + */ + public void release() { + while (true) { + int cnt = activeCnt.get(); + + if (cnt == 0) { + if (log.isDebugEnabled()) + log.debug("IPC IO not released (count was 0): " + this); + + return; + } + + if (activeCnt.compareAndSet(cnt, cnt - 1)) { + if (cnt == 1) { + ipcCache.remove(endpointAddr, this); + + if (log.isDebugEnabled()) + log.debug("IPC IO stopping as unused: " + this); + + stop(); + } + else if (log.isDebugEnabled()) + log.debug("IPC IO released: " + this); + + return; + } + } + } + + /** + * Closes this IO instance, removing it from cache. + */ + public void forceClose() { + if (ipcCache.remove(endpointAddr, this)) + stop(); + } + + /** + * Starts the IO. + * + * @throws IgniteCheckedException If failed to connect the endpoint. + */ + private void start() throws IgniteCheckedException { + boolean success = false; + + try { + endpoint = IpcEndpointFactory.connectEndpoint( + endpointAddr, new GridLoggerProxy(new HadoopIgfsJclLogger(log), null, null, "")); + + out = new IgfsDataOutputStream(new BufferedOutputStream(endpoint.outputStream())); + + reader = new ReaderThread(); + + // Required for Hadoop 2.x + reader.setDaemon(true); + + reader.start(); + + success = true; + } + catch (IgniteCheckedException e) { + IpcOutOfSystemResourcesException resEx = e.getCause(IpcOutOfSystemResourcesException.class); + + if (resEx != null) + throw new IgniteCheckedException(IpcSharedMemoryServerEndpoint.OUT_OF_RESOURCES_MSG, resEx); + + throw e; + } + finally { + if (!success) + stop(); + } + } + + /** + * Shuts down the IO. No send requests will be accepted anymore, all pending futures will be failed. + * Close listeners will be invoked as if connection is closed by server. + */ + private void stop() { + close0(null); + + if (reader != null) { + try { + U.interrupt(reader); + U.join(reader); + + reader = null; + } + catch (IgniteInterruptedCheckedException ignored) { + Thread.currentThread().interrupt(); + + log.warn("Got interrupted while waiting for reader thread to shut down (will return)."); + } + } + } + + /** {@inheritDoc} */ + @Override public void addEventListener(HadoopIgfsIpcIoListener lsnr) { + if (!busyLock.readLock().tryLock()) { + lsnr.onClose(); + + return; + } + + boolean invokeNow = false; + + try { + invokeNow = stopping; + + if (!invokeNow) + lsnrs.add(lsnr); + } + finally { + busyLock.readLock().unlock(); + + if (invokeNow) + lsnr.onClose(); + } + } + + /** {@inheritDoc} */ + @Override public void removeEventListener(HadoopIgfsIpcIoListener lsnr) { + lsnrs.remove(lsnr); + } + + /** {@inheritDoc} */ + @Override public GridPlainFuture<IgfsMessage> send(IgfsMessage msg) throws IgniteCheckedException { + return send(msg, null, 0, 0); + } + + /** {@inheritDoc} */ + @Override public <T> GridPlainFuture<T> send(IgfsMessage msg, @Nullable byte[] outBuf, int outOff, + int outLen) throws IgniteCheckedException { + assert outBuf == null || msg.command() == IgfsIpcCommand.READ_BLOCK; + + if (!busyLock.readLock().tryLock()) + throw new HadoopIgfsCommunicationException("Failed to send message (client is being concurrently " + + "closed)."); + + try { + if (stopping) + throw new HadoopIgfsCommunicationException("Failed to send message (client is being concurrently " + + "closed)."); + + long reqId = reqIdCnt.getAndIncrement(); + + HadoopIgfsFuture<T> fut = new HadoopIgfsFuture<>(); + + fut.outputBuffer(outBuf); + fut.outputOffset(outOff); + fut.outputLength(outLen); + fut.read(msg.command() == IgfsIpcCommand.READ_BLOCK); + + HadoopIgfsFuture oldFut = reqMap.putIfAbsent(reqId, fut); + + assert oldFut == null; + + if (log.isDebugEnabled()) + log.debug("Sending IGFS message [reqId=" + reqId + ", msg=" + msg + ']'); + + byte[] hdr = IgfsMarshaller.createHeader(reqId, msg.command()); + + IgniteCheckedException err = null; + + try { + synchronized (this) { + marsh.marshall(msg, hdr, out); + + out.flush(); // Blocking operation + sometimes system call. + } + } + catch (IgniteCheckedException e) { + err = e; + } + catch (IOException e) { + err = new HadoopIgfsCommunicationException(e); + } + + if (err != null) { + reqMap.remove(reqId, fut); + + fut.onDone(err); + } + + return fut; + } + finally { + busyLock.readLock().unlock(); + } + } + + /** {@inheritDoc} */ + @Override public void sendPlain(IgfsMessage msg) throws IgniteCheckedException { + if (!busyLock.readLock().tryLock()) + throw new HadoopIgfsCommunicationException("Failed to send message (client is being " + + "concurrently closed)."); + + try { + if (stopping) + throw new HadoopIgfsCommunicationException("Failed to send message (client is being concurrently closed)."); + + assert msg.command() == IgfsIpcCommand.WRITE_BLOCK; + + IgfsStreamControlRequest req = (IgfsStreamControlRequest)msg; + + byte[] hdr = IgfsMarshaller.createHeader(-1, IgfsIpcCommand.WRITE_BLOCK); + + U.longToBytes(req.streamId(), hdr, 12); + U.intToBytes(req.length(), hdr, 20); + + synchronized (this) { + out.write(hdr); + out.write(req.data(), (int)req.position(), req.length()); + + out.flush(); + } + } + catch (IOException e) { + throw new HadoopIgfsCommunicationException(e); + } + finally { + busyLock.readLock().unlock(); + } + } + + /** + * Closes client but does not wait. + * + * @param err Error. + */ + private void close0(@Nullable Throwable err) { + busyLock.writeLock().lock(); + + try { + if (stopping) + return; + + stopping = true; + } + finally { + busyLock.writeLock().unlock(); + } + + if (err == null) + err = new IgniteCheckedException("Failed to perform request (connection was concurrently closed before response " + + "is received)."); + + // Clean up resources. + U.closeQuiet(out); + + if (endpoint != null) + endpoint.close(); + + // Unwind futures. We can safely iterate here because no more futures will be added. + Iterator<HadoopIgfsFuture> it = reqMap.values().iterator(); + + while (it.hasNext()) { + HadoopIgfsFuture fut = it.next(); + + fut.onDone(err); + + it.remove(); + } + + for (HadoopIgfsIpcIoListener lsnr : lsnrs) + lsnr.onClose(); + } + + /** + * Do not extend {@code GridThread} to minimize class dependencies. + */ + private class ReaderThread extends Thread { + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public void run() { + // Error to fail pending futures. + Throwable err = null; + + try { + InputStream in = endpoint.inputStream(); + + IgfsDataInputStream dis = new IgfsDataInputStream(in); + + byte[] hdr = new byte[IgfsMarshaller.HEADER_SIZE]; + byte[] msgHdr = new byte[IgfsControlResponse.RES_HEADER_SIZE]; + + while (!Thread.currentThread().isInterrupted()) { + dis.readFully(hdr); + + long reqId = U.bytesToLong(hdr, 0); + + // We don't wait for write responses, therefore reqId is -1. + if (reqId == -1) { + // We received a response which normally should not be sent. It must contain an error. + dis.readFully(msgHdr); + + assert msgHdr[4] != 0; + + String errMsg = dis.readUTF(); + + // Error code. + dis.readInt(); + + long streamId = dis.readLong(); + + for (HadoopIgfsIpcIoListener lsnr : lsnrs) + lsnr.onError(streamId, errMsg); + } + else { + HadoopIgfsFuture<Object> fut = reqMap.remove(reqId); + + if (fut == null) { + String msg = "Failed to read response from server: response closure is unavailable for " + + "requestId (will close connection):" + reqId; + + log.warn(msg); + + err = new IgniteCheckedException(msg); + + break; + } + else { + try { + IgfsIpcCommand cmd = IgfsIpcCommand.valueOf(U.bytesToInt(hdr, 8)); + + if (log.isDebugEnabled()) + log.debug("Received IGFS response [reqId=" + reqId + ", cmd=" + cmd + ']'); + + Object res = null; + + if (fut.read()) { + dis.readFully(msgHdr); + + boolean hasErr = msgHdr[4] != 0; + + if (hasErr) { + String errMsg = dis.readUTF(); + + // Error code. + Integer errCode = dis.readInt(); + + IgfsControlResponse.throwError(errCode, errMsg); + } + + int blockLen = U.bytesToInt(msgHdr, 5); + + int readLen = Math.min(blockLen, fut.outputLength()); + + if (readLen > 0) { + assert fut.outputBuffer() != null; + + dis.readFully(fut.outputBuffer(), fut.outputOffset(), readLen); + } + + if (readLen != blockLen) { + byte[] buf = new byte[blockLen - readLen]; + + dis.readFully(buf); + + res = buf; + } + } + else + res = marsh.unmarshall(cmd, hdr, dis); + + fut.onDone(res); + } + catch (IgniteCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to apply response closure (will fail request future): " + + e.getMessage()); + + fut.onDone(e); + + err = e; + } + } + } + } + } + catch (EOFException ignored) { + err = new IgniteCheckedException("Failed to read response from server (connection was closed by remote peer)."); + } + catch (IOException e) { + if (!stopping) + log.error("Failed to read data (connection will be closed)", e); + + err = new HadoopIgfsCommunicationException(e); + } + catch (IgniteCheckedException e) { + if (!stopping) + log.error("Failed to obtain endpoint input stream (connection will be closed)", e); + + err = e; + } + finally { + close0(err); + } + } + } + + /** {@inheritDoc} */ + @Override public String toString() { + return getClass().getSimpleName() + " [endpointAddr=" + endpointAddr + ", activeCnt=" + activeCnt + + ", stopping=" + stopping + ']'; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1ffc10f/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsIpcIoListener.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsIpcIoListener.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsIpcIoListener.java index 10d764e..049e2b7 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsIpcIoListener.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsIpcIoListener.java @@ -18,7 +18,7 @@ package org.apache.ignite.internal.igfs.hadoop; /** - * Listens to the events of {@link HadoopIpcIgfsIo}. + * Listens to the events of {@link HadoopIgfsIpcIo}. */ public interface HadoopIgfsIpcIoListener { /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1ffc10f/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsOutProc.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsOutProc.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsOutProc.java new file mode 100644 index 0000000..4cfacb9 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsOutProc.java @@ -0,0 +1,466 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.igfs.hadoop; + +import org.apache.commons.logging.*; +import org.apache.ignite.*; +import org.apache.ignite.igfs.*; +import org.apache.ignite.internal.igfs.common.*; +import org.apache.ignite.internal.processors.igfs.*; +import org.apache.ignite.internal.util.lang.*; +import org.jdk8.backport.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.util.*; + +import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.*; + +/** + * Communication with external process (TCP or shmem). + */ +public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener { + /** Expected result is boolean. */ + private static final GridPlainClosure<GridPlainFuture<IgfsMessage>, Boolean> BOOL_RES = createClosure(); + + /** Expected result is boolean. */ + private static final GridPlainClosure<GridPlainFuture<IgfsMessage>, Long> LONG_RES = createClosure(); + + /** Expected result is {@code IgfsFile}. */ + private static final GridPlainClosure<GridPlainFuture<IgfsMessage>, IgfsFile> FILE_RES = createClosure(); + + /** Expected result is {@code IgfsHandshakeResponse} */ + private static final GridPlainClosure<GridPlainFuture<IgfsMessage>, + IgfsHandshakeResponse> HANDSHAKE_RES = createClosure(); + + /** Expected result is {@code IgfsStatus} */ + private static final GridPlainClosure<GridPlainFuture<IgfsMessage>, IgfsStatus> STATUS_RES = + createClosure(); + + /** Expected result is {@code IgfsFile}. */ + private static final GridPlainClosure<GridPlainFuture<IgfsMessage>, + IgfsInputStreamDescriptor> STREAM_DESCRIPTOR_RES = createClosure(); + + /** Expected result is {@code IgfsFile}. */ + private static final GridPlainClosure<GridPlainFuture<IgfsMessage>, + Collection<IgfsFile>> FILE_COL_RES = createClosure(); + + /** Expected result is {@code IgfsFile}. */ + private static final GridPlainClosure<GridPlainFuture<IgfsMessage>, + Collection<IgfsPath>> PATH_COL_RES = createClosure(); + + /** Expected result is {@code IgfsPathSummary}. */ + private static final GridPlainClosure<GridPlainFuture<IgfsMessage>, IgfsPathSummary> SUMMARY_RES = + createClosure(); + + /** Expected result is {@code IgfsFile}. */ + private static final GridPlainClosure<GridPlainFuture<IgfsMessage>, + Collection<IgfsBlockLocation>> BLOCK_LOCATION_COL_RES = createClosure(); + + /** Grid name. */ + private final String grid; + + /** IGFS name. */ + private final String igfs; + + /** Client log. */ + private final Log log; + + /** Client IO. */ + private final HadoopIgfsIpcIo io; + + /** Event listeners. */ + private final Map<Long, HadoopIgfsStreamEventListener> lsnrs = new ConcurrentHashMap8<>(); + + /** + * Constructor for TCP endpoint. + * + * @param host Host. + * @param port Port. + * @param grid Grid name. + * @param igfs IGFS name. + * @param log Client logger. + * @throws IOException If failed. + */ + public HadoopIgfsOutProc(String host, int port, String grid, String igfs, Log log) throws IOException { + this(host, port, grid, igfs, false, log); + } + + /** + * Constructor for shmem endpoint. + * + * @param port Port. + * @param grid Grid name. + * @param igfs IGFS name. + * @param log Client logger. + * @throws IOException If failed. + */ + public HadoopIgfsOutProc(int port, String grid, String igfs, Log log) throws IOException { + this(null, port, grid, igfs, true, log); + } + + /** + * Constructor. + * + * @param host Host. + * @param port Port. + * @param grid Grid name. + * @param igfs IGFS name. + * @param shmem Shared memory flag. + * @param log Client logger. + * @throws IOException If failed. + */ + private HadoopIgfsOutProc(String host, int port, String grid, String igfs, boolean shmem, Log log) + throws IOException { + assert host != null && !shmem || host == null && shmem : + "Invalid arguments [host=" + host + ", port=" + port + ", shmem=" + shmem + ']'; + + String endpoint = host != null ? host + ":" + port : "shmem:" + port; + + this.grid = grid; + this.igfs = igfs; + this.log = log; + + io = HadoopIgfsIpcIo.get(log, endpoint); + + io.addEventListener(this); + } + + /** {@inheritDoc} */ + @Override public IgfsHandshakeResponse handshake(String logDir) throws IgniteCheckedException { + final IgfsHandshakeRequest req = new IgfsHandshakeRequest(); + + req.gridName(grid); + req.igfsName(igfs); + req.logDirectory(logDir); + + return io.send(req).chain(HANDSHAKE_RES).get(); + } + + /** {@inheritDoc} */ + @Override public void close(boolean force) { + assert io != null; + + io.removeEventListener(this); + + if (force) + io.forceClose(); + else + io.release(); + } + + /** {@inheritDoc} */ + @Override public IgfsFile info(IgfsPath path) throws IgniteCheckedException { + final IgfsPathControlRequest msg = new IgfsPathControlRequest(); + + msg.command(INFO); + msg.path(path); + + return io.send(msg).chain(FILE_RES).get(); + } + + /** {@inheritDoc} */ + @Override public IgfsFile update(IgfsPath path, Map<String, String> props) throws IgniteCheckedException { + final IgfsPathControlRequest msg = new IgfsPathControlRequest(); + + msg.command(UPDATE); + msg.path(path); + msg.properties(props); + + return io.send(msg).chain(FILE_RES).get(); + } + + /** {@inheritDoc} */ + @Override public Boolean setTimes(IgfsPath path, long accessTime, long modificationTime) throws IgniteCheckedException { + final IgfsPathControlRequest msg = new IgfsPathControlRequest(); + + msg.command(SET_TIMES); + msg.path(path); + msg.accessTime(accessTime); + msg.modificationTime(modificationTime); + + return io.send(msg).chain(BOOL_RES).get(); + } + + /** {@inheritDoc} */ + @Override public Boolean rename(IgfsPath src, IgfsPath dest) throws IgniteCheckedException { + final IgfsPathControlRequest msg = new IgfsPathControlRequest(); + + msg.command(RENAME); + msg.path(src); + msg.destinationPath(dest); + + return io.send(msg).chain(BOOL_RES).get(); + } + + /** {@inheritDoc} */ + @Override public Boolean delete(IgfsPath path, boolean recursive) throws IgniteCheckedException { + final IgfsPathControlRequest msg = new IgfsPathControlRequest(); + + msg.command(DELETE); + msg.path(path); + msg.flag(recursive); + + return io.send(msg).chain(BOOL_RES).get(); + } + + /** {@inheritDoc} */ + @Override public Collection<IgfsBlockLocation> affinity(IgfsPath path, long start, long len) + throws IgniteCheckedException { + final IgfsPathControlRequest msg = new IgfsPathControlRequest(); + + msg.command(AFFINITY); + msg.path(path); + msg.start(start); + msg.length(len); + + return io.send(msg).chain(BLOCK_LOCATION_COL_RES).get(); + } + + /** {@inheritDoc} */ + @Override public IgfsPathSummary contentSummary(IgfsPath path) throws IgniteCheckedException { + final IgfsPathControlRequest msg = new IgfsPathControlRequest(); + + msg.command(PATH_SUMMARY); + msg.path(path); + + return io.send(msg).chain(SUMMARY_RES).get(); + } + + /** {@inheritDoc} */ + @Override public Boolean mkdirs(IgfsPath path, Map<String, String> props) throws IgniteCheckedException { + final IgfsPathControlRequest msg = new IgfsPathControlRequest(); + + msg.command(MAKE_DIRECTORIES); + msg.path(path); + msg.properties(props); + + return io.send(msg).chain(BOOL_RES).get(); + } + + /** {@inheritDoc} */ + @Override public Collection<IgfsFile> listFiles(IgfsPath path) throws IgniteCheckedException { + final IgfsPathControlRequest msg = new IgfsPathControlRequest(); + + msg.command(LIST_FILES); + msg.path(path); + + return io.send(msg).chain(FILE_COL_RES).get(); + } + + /** {@inheritDoc} */ + @Override public Collection<IgfsPath> listPaths(IgfsPath path) throws IgniteCheckedException { + final IgfsPathControlRequest msg = new IgfsPathControlRequest(); + + msg.command(LIST_PATHS); + msg.path(path); + + return io.send(msg).chain(PATH_COL_RES).get(); + } + + /** {@inheritDoc} */ + @Override public IgfsStatus fsStatus() throws IgniteCheckedException { + return io.send(new IgfsStatusRequest()).chain(STATUS_RES).get(); + } + + /** {@inheritDoc} */ + @Override public HadoopIgfsStreamDelegate open(IgfsPath path) throws IgniteCheckedException { + final IgfsPathControlRequest msg = new IgfsPathControlRequest(); + + msg.command(OPEN_READ); + msg.path(path); + msg.flag(false); + + IgfsInputStreamDescriptor rmtDesc = io.send(msg).chain(STREAM_DESCRIPTOR_RES).get(); + + return new HadoopIgfsStreamDelegate(this, rmtDesc.streamId(), rmtDesc.length()); + } + + /** {@inheritDoc} */ + @Override public HadoopIgfsStreamDelegate open(IgfsPath path, + int seqReadsBeforePrefetch) throws IgniteCheckedException { + final IgfsPathControlRequest msg = new IgfsPathControlRequest(); + + msg.command(OPEN_READ); + msg.path(path); + msg.flag(true); + msg.sequentialReadsBeforePrefetch(seqReadsBeforePrefetch); + + IgfsInputStreamDescriptor rmtDesc = io.send(msg).chain(STREAM_DESCRIPTOR_RES).get(); + + return new HadoopIgfsStreamDelegate(this, rmtDesc.streamId(), rmtDesc.length()); + } + + /** {@inheritDoc} */ + @Override public HadoopIgfsStreamDelegate create(IgfsPath path, boolean overwrite, boolean colocate, + int replication, long blockSize, @Nullable Map<String, String> props) throws IgniteCheckedException { + final IgfsPathControlRequest msg = new IgfsPathControlRequest(); + + msg.command(OPEN_CREATE); + msg.path(path); + msg.flag(overwrite); + msg.colocate(colocate); + msg.properties(props); + msg.replication(replication); + msg.blockSize(blockSize); + + Long streamId = io.send(msg).chain(LONG_RES).get(); + + return new HadoopIgfsStreamDelegate(this, streamId); + } + + /** {@inheritDoc} */ + @Override public HadoopIgfsStreamDelegate append(IgfsPath path, boolean create, + @Nullable Map<String, String> props) throws IgniteCheckedException { + final IgfsPathControlRequest msg = new IgfsPathControlRequest(); + + msg.command(OPEN_APPEND); + msg.path(path); + msg.flag(create); + msg.properties(props); + + Long streamId = io.send(msg).chain(LONG_RES).get(); + + return new HadoopIgfsStreamDelegate(this, streamId); + } + + /** {@inheritDoc} */ + @Override public GridPlainFuture<byte[]> readData(HadoopIgfsStreamDelegate desc, long pos, int len, + final @Nullable byte[] outBuf, final int outOff, final int outLen) { + assert len > 0; + + final IgfsStreamControlRequest msg = new IgfsStreamControlRequest(); + + msg.command(READ_BLOCK); + msg.streamId((long) desc.target()); + msg.position(pos); + msg.length(len); + + try { + return io.send(msg, outBuf, outOff, outLen); + } + catch (IgniteCheckedException e) { + return new GridPlainFutureAdapter<>(e); + } + } + + /** {@inheritDoc} */ + @Override public void writeData(HadoopIgfsStreamDelegate desc, byte[] data, int off, int len) + throws IOException { + final IgfsStreamControlRequest msg = new IgfsStreamControlRequest(); + + msg.command(WRITE_BLOCK); + msg.streamId((long) desc.target()); + msg.data(data); + msg.position(off); + msg.length(len); + + try { + io.sendPlain(msg); + } + catch (IgniteCheckedException e) { + throw HadoopIgfsUtils.cast(e); + } + } + + /** {@inheritDoc} */ + @Override public void flush(HadoopIgfsStreamDelegate delegate) throws IOException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void closeStream(HadoopIgfsStreamDelegate desc) throws IOException { + final IgfsStreamControlRequest msg = new IgfsStreamControlRequest(); + + msg.command(CLOSE); + msg.streamId((long)desc.target()); + + try { + io.send(msg).chain(BOOL_RES).get(); + } + catch (IgniteCheckedException e) { + throw HadoopIgfsUtils.cast(e); + } + } + + /** {@inheritDoc} */ + @Override public void addEventListener(HadoopIgfsStreamDelegate desc, + HadoopIgfsStreamEventListener lsnr) { + long streamId = desc.target(); + + HadoopIgfsStreamEventListener lsnr0 = lsnrs.put(streamId, lsnr); + + assert lsnr0 == null || lsnr0 == lsnr; + + if (log.isDebugEnabled()) + log.debug("Added stream event listener [streamId=" + streamId + ']'); + } + + /** {@inheritDoc} */ + @Override public void removeEventListener(HadoopIgfsStreamDelegate desc) { + long streamId = desc.target(); + + HadoopIgfsStreamEventListener lsnr0 = lsnrs.remove(streamId); + + if (lsnr0 != null && log.isDebugEnabled()) + log.debug("Removed stream event listener [streamId=" + streamId + ']'); + } + + /** {@inheritDoc} */ + @Override public void onClose() { + for (HadoopIgfsStreamEventListener lsnr : lsnrs.values()) { + try { + lsnr.onClose(); + } + catch (IgniteCheckedException e) { + log.warn("Got exception from stream event listener (will ignore): " + lsnr, e); + } + } + } + + /** {@inheritDoc} */ + @Override public void onError(long streamId, String errMsg) { + HadoopIgfsStreamEventListener lsnr = lsnrs.get(streamId); + + if (lsnr != null) + lsnr.onError(errMsg); + else + log.warn("Received write error response for not registered output stream (will ignore) " + + "[streamId= " + streamId + ']'); + } + + /** + * Creates conversion closure for given type. + * + * @param <T> Type of expected result. + * @return Conversion closure. + */ + @SuppressWarnings("unchecked") + private static <T> GridPlainClosure<GridPlainFuture<IgfsMessage>, T> createClosure() { + return new GridPlainClosure<GridPlainFuture<IgfsMessage>, T>() { + @Override public T apply(GridPlainFuture<IgfsMessage> fut) throws IgniteCheckedException { + IgfsControlResponse res = (IgfsControlResponse)fut.get(); + + if (res.hasError()) + res.throwError(); + + return (T)res.response(); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1ffc10f/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsOutputStream.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsOutputStream.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsOutputStream.java new file mode 100644 index 0000000..9e08bdb --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsOutputStream.java @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.igfs.hadoop; + +import org.apache.commons.logging.*; +import org.apache.ignite.*; +import org.apache.ignite.internal.igfs.common.*; +import org.jetbrains.annotations.*; + +import java.io.*; + +/** + * IGFS Hadoop output stream implementation. + */ +public class HadoopIgfsOutputStream extends OutputStream implements HadoopIgfsStreamEventListener { + /** Log instance. */ + private Log log; + + /** Client logger. */ + private IgfsLogger clientLog; + + /** Log stream ID. */ + private long logStreamId; + + /** Server stream delegate. */ + private HadoopIgfsStreamDelegate delegate; + + /** Closed flag. */ + private volatile boolean closed; + + /** Flag set if stream was closed due to connection breakage. */ + private boolean connBroken; + + /** Error message. */ + private volatile String errMsg; + + /** Read time. */ + private long writeTime; + + /** User time. */ + private long userTime; + + /** Last timestamp. */ + private long lastTs; + + /** Amount of written bytes. */ + private long total; + + /** + * Creates light output stream. + * + * @param delegate Server stream delegate. + * @param log Logger to use. + * @param clientLog Client logger. + */ + public HadoopIgfsOutputStream(HadoopIgfsStreamDelegate delegate, Log log, + IgfsLogger clientLog, long logStreamId) { + this.delegate = delegate; + this.log = log; + this.clientLog = clientLog; + this.logStreamId = logStreamId; + + lastTs = System.nanoTime(); + + delegate.hadoop().addEventListener(delegate, this); + } + + /** + * Read start. + */ + private void writeStart() { + long now = System.nanoTime(); + + userTime += now - lastTs; + + lastTs = now; + } + + /** + * Read end. + */ + private void writeEnd() { + long now = System.nanoTime(); + + writeTime += now - lastTs; + + lastTs = now; + } + + /** {@inheritDoc} */ + @Override public void write(@NotNull byte[] b, int off, int len) throws IOException { + check(); + + writeStart(); + + try { + delegate.hadoop().writeData(delegate, b, off, len); + + total += len; + } + finally { + writeEnd(); + } + } + + /** {@inheritDoc} */ + @Override public void write(int b) throws IOException { + write(new byte[] {(byte)b}); + + total++; + } + + /** {@inheritDoc} */ + @Override public void flush() throws IOException { + delegate.hadoop().flush(delegate); + } + + /** {@inheritDoc} */ + @Override public void close() throws IOException { + if (!closed) { + if (log.isDebugEnabled()) + log.debug("Closing output stream: " + delegate); + + writeStart(); + + delegate.hadoop().closeStream(delegate); + + markClosed(false); + + writeEnd(); + + if (clientLog.isLogEnabled()) + clientLog.logCloseOut(logStreamId, userTime, writeTime, total); + + if (log.isDebugEnabled()) + log.debug("Closed output stream [delegate=" + delegate + ", writeTime=" + writeTime / 1000 + + ", userTime=" + userTime / 1000 + ']'); + } + else if(connBroken) + throw new IOException( + "Failed to close stream, because connection was broken (data could have been lost)."); + } + + /** + * Marks stream as closed. + * + * @param connBroken {@code True} if connection with server was lost. + */ + private void markClosed(boolean connBroken) { + // It is ok to have race here. + if (!closed) { + closed = true; + + delegate.hadoop().removeEventListener(delegate); + + this.connBroken = connBroken; + } + } + + /** + * @throws IOException If check failed. + */ + private void check() throws IOException { + String errMsg0 = errMsg; + + if (errMsg0 != null) + throw new IOException(errMsg0); + + if (closed) { + if (connBroken) + throw new IOException("Server connection was lost."); + else + throw new IOException("Stream is closed."); + } + } + + /** {@inheritDoc} */ + @Override public void onClose() throws IgniteCheckedException { + markClosed(true); + } + + /** {@inheritDoc} */ + @Override public void onError(String errMsg) { + this.errMsg = errMsg; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1ffc10f/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsWrapper.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsWrapper.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsWrapper.java index 94a4449..69e6503 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsWrapper.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsWrapper.java @@ -384,7 +384,7 @@ public class HadoopIgfsWrapper implements HadoopIgfs { HadoopIgfsEx hadoop = null; try { - hadoop = new HadoopOutProcIgfs(endpoint.port(), endpoint.grid(), endpoint.igfs(), log); + hadoop = new HadoopIgfsOutProc(endpoint.port(), endpoint.grid(), endpoint.igfs(), log); curDelegate = new Delegate(hadoop, hadoop.handshake(logDir)); } @@ -408,7 +408,7 @@ public class HadoopIgfsWrapper implements HadoopIgfs { HadoopIgfsEx hadoop = null; try { - hadoop = new HadoopOutProcIgfs(LOCALHOST, endpoint.port(), endpoint.grid(), endpoint.igfs(), + hadoop = new HadoopIgfsOutProc(LOCALHOST, endpoint.port(), endpoint.grid(), endpoint.igfs(), log); curDelegate = new Delegate(hadoop, hadoop.handshake(logDir)); @@ -430,7 +430,7 @@ public class HadoopIgfsWrapper implements HadoopIgfs { HadoopIgfsEx hadoop = null; try { - hadoop = new HadoopOutProcIgfs(endpoint.host(), endpoint.port(), endpoint.grid(), endpoint.igfs(), log); + hadoop = new HadoopIgfsOutProc(endpoint.host(), endpoint.port(), endpoint.grid(), endpoint.igfs(), log); curDelegate = new Delegate(hadoop, hadoop.handshake(logDir)); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1ffc10f/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopInputIgfsStream.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopInputIgfsStream.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopInputIgfsStream.java deleted file mode 100644 index 5a008bd..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopInputIgfsStream.java +++ /dev/null @@ -1,626 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.igfs.hadoop; - -import org.apache.commons.logging.*; -import org.apache.hadoop.fs.*; -import org.apache.ignite.*; -import org.apache.ignite.internal.igfs.common.*; -import org.apache.ignite.internal.util.lang.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.jetbrains.annotations.*; - -import java.io.*; - -/** - * IGFS input stream wrapper for hadoop interfaces. - */ -@SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") -public final class HadoopInputIgfsStream extends InputStream implements Seekable, PositionedReadable, - HadoopIgfsStreamEventListener { - /** Minimum buffer size. */ - private static final int MIN_BUF_SIZE = 4 * 1024; - - /** Server stream delegate. */ - private HadoopIgfsStreamDelegate delegate; - - /** Stream ID used by logger. */ - private long logStreamId; - - /** Stream position. */ - private long pos; - - /** Stream read limit. */ - private long limit; - - /** Mark position. */ - private long markPos = -1; - - /** Prefetch buffer. */ - private DoubleFetchBuffer buf = new DoubleFetchBuffer(); - - /** Buffer half size for double-buffering. */ - private int bufHalfSize; - - /** Closed flag. */ - private volatile boolean closed; - - /** Flag set if stream was closed due to connection breakage. */ - private boolean connBroken; - - /** Logger. */ - private Log log; - - /** Client logger. */ - private IgfsLogger clientLog; - - /** Read time. */ - private long readTime; - - /** User time. */ - private long userTime; - - /** Last timestamp. */ - private long lastTs; - - /** Amount of read bytes. */ - private long total; - - /** - * Creates input stream. - * - * @param delegate Server stream delegate. - * @param limit Read limit. - * @param bufSize Buffer size. - * @param log Log. - * @param clientLog Client logger. - */ - public HadoopInputIgfsStream(HadoopIgfsStreamDelegate delegate, long limit, int bufSize, Log log, - IgfsLogger clientLog, long logStreamId) { - assert limit >= 0; - - this.delegate = delegate; - this.limit = limit; - this.log = log; - this.clientLog = clientLog; - this.logStreamId = logStreamId; - - bufHalfSize = Math.max(bufSize, MIN_BUF_SIZE); - - lastTs = System.nanoTime(); - - delegate.hadoop().addEventListener(delegate, this); - } - - /** - * Read start. - */ - private void readStart() { - long now = System.nanoTime(); - - userTime += now - lastTs; - - lastTs = now; - } - - /** - * Read end. - */ - private void readEnd() { - long now = System.nanoTime(); - - readTime += now - lastTs; - - lastTs = now; - } - - /** {@inheritDoc} */ - @Override public synchronized int read() throws IOException { - checkClosed(); - - readStart(); - - try { - if (eof()) - return -1; - - buf.refreshAhead(pos); - - int res = buf.atPosition(pos); - - pos++; - total++; - - buf.refreshAhead(pos); - - return res; - } - catch (IgniteCheckedException e) { - throw HadoopIgfsUtils.cast(e); - } - finally { - readEnd(); - } - } - - /** {@inheritDoc} */ - @Override public synchronized int read(@NotNull byte[] b, int off, int len) throws IOException { - checkClosed(); - - if (eof()) - return -1; - - readStart(); - - try { - long remaining = limit - pos; - - int read = buf.flatten(b, pos, off, len); - - pos += read; - total += read; - remaining -= read; - - if (remaining > 0 && read != len) { - int readAmt = (int)Math.min(remaining, len - read); - - delegate.hadoop().readData(delegate, pos, readAmt, b, off + read, len - read).get(); - - read += readAmt; - pos += readAmt; - total += readAmt; - } - - buf.refreshAhead(pos); - - return read; - } - catch (IgniteCheckedException e) { - throw HadoopIgfsUtils.cast(e); - } - finally { - readEnd(); - } - } - - /** {@inheritDoc} */ - @Override public synchronized long skip(long n) throws IOException { - checkClosed(); - - if (clientLog.isLogEnabled()) - clientLog.logSkip(logStreamId, n); - - long oldPos = pos; - - if (pos + n <= limit) - pos += n; - else - pos = limit; - - buf.refreshAhead(pos); - - return pos - oldPos; - } - - /** {@inheritDoc} */ - @Override public synchronized int available() throws IOException { - checkClosed(); - - int available = buf.available(pos); - - assert available >= 0; - - return available; - } - - /** {@inheritDoc} */ - @Override public synchronized void close() throws IOException { - if (!closed) { - readStart(); - - if (log.isDebugEnabled()) - log.debug("Closing input stream: " + delegate); - - delegate.hadoop().closeStream(delegate); - - readEnd(); - - if (clientLog.isLogEnabled()) - clientLog.logCloseIn(logStreamId, userTime, readTime, total); - - markClosed(false); - - if (log.isDebugEnabled()) - log.debug("Closed stream [delegate=" + delegate + ", readTime=" + readTime + - ", userTime=" + userTime + ']'); - } - } - - /** {@inheritDoc} */ - @Override public synchronized void mark(int readLimit) { - markPos = pos; - - if (clientLog.isLogEnabled()) - clientLog.logMark(logStreamId, readLimit); - } - - /** {@inheritDoc} */ - @Override public synchronized void reset() throws IOException { - checkClosed(); - - if (clientLog.isLogEnabled()) - clientLog.logReset(logStreamId); - - if (markPos == -1) - throw new IOException("Stream was not marked."); - - pos = markPos; - - buf.refreshAhead(pos); - } - - /** {@inheritDoc} */ - @Override public boolean markSupported() { - return true; - } - - /** {@inheritDoc} */ - @Override public synchronized int read(long position, byte[] buf, int off, int len) throws IOException { - long remaining = limit - position; - - int read = (int)Math.min(len, remaining); - - // Return -1 at EOF. - if (read == 0) - return -1; - - readFully(position, buf, off, read); - - return read; - } - - /** {@inheritDoc} */ - @Override public synchronized void readFully(long position, byte[] buf, int off, int len) throws IOException { - long remaining = limit - position; - - checkClosed(); - - if (len > remaining) - throw new EOFException("End of stream reached before data was fully read."); - - readStart(); - - try { - int read = this.buf.flatten(buf, position, off, len); - - total += read; - - if (read != len) { - int readAmt = len - read; - - delegate.hadoop().readData(delegate, position + read, readAmt, buf, off + read, readAmt).get(); - - total += readAmt; - } - - if (clientLog.isLogEnabled()) - clientLog.logRandomRead(logStreamId, position, len); - } - catch (IgniteCheckedException e) { - throw HadoopIgfsUtils.cast(e); - } - finally { - readEnd(); - } - } - - /** {@inheritDoc} */ - @Override public void readFully(long position, byte[] buf) throws IOException { - readFully(position, buf, 0, buf.length); - } - - /** {@inheritDoc} */ - @Override public synchronized void seek(long pos) throws IOException { - A.ensure(pos >= 0, "position must be non-negative"); - - checkClosed(); - - if (clientLog.isLogEnabled()) - clientLog.logSeek(logStreamId, pos); - - if (pos > limit) - pos = limit; - - if (log.isDebugEnabled()) - log.debug("Seek to position [delegate=" + delegate + ", pos=" + pos + ", oldPos=" + this.pos + ']'); - - this.pos = pos; - - buf.refreshAhead(pos); - } - - /** {@inheritDoc} */ - @Override public synchronized long getPos() { - return pos; - } - - /** {@inheritDoc} */ - @Override public synchronized boolean seekToNewSource(long targetPos) { - return false; - } - - /** {@inheritDoc} */ - @Override public void onClose() { - markClosed(true); - } - - /** {@inheritDoc} */ - @Override public void onError(String errMsg) { - // No-op. - } - - /** - * Marks stream as closed. - * - * @param connBroken {@code True} if connection with server was lost. - */ - private void markClosed(boolean connBroken) { - // It is ok to have race here. - if (!closed) { - closed = true; - - this.connBroken = connBroken; - - delegate.hadoop().removeEventListener(delegate); - } - } - - /** - * @throws IOException If check failed. - */ - private void checkClosed() throws IOException { - if (closed) { - if (connBroken) - throw new IOException("Server connection was lost."); - else - throw new IOException("Stream is closed."); - } - } - - /** - * @return {@code True} if end of stream reached. - */ - private boolean eof() { - return limit == pos; - } - - /** - * Asynchronous prefetch buffer. - */ - private static class FetchBufferPart { - /** Read future. */ - private GridPlainFuture<byte[]> readFut; - - /** Position of cached chunk in file. */ - private long pos; - - /** Prefetch length. Need to store as read future result might be not available yet. */ - private int len; - - /** - * Creates fetch buffer part. - * - * @param readFut Read future for this buffer. - * @param pos Read position. - * @param len Chunk length. - */ - private FetchBufferPart(GridPlainFuture<byte[]> readFut, long pos, int len) { - this.readFut = readFut; - this.pos = pos; - this.len = len; - } - - /** - * Copies cached data if specified position matches cached region. - * - * @param dst Destination buffer. - * @param pos Read position in file. - * @param dstOff Offset in destination buffer from which start writing. - * @param len Maximum number of bytes to copy. - * @return Number of bytes copied. - * @throws IgniteCheckedException If read future failed. - */ - public int flatten(byte[] dst, long pos, int dstOff, int len) throws IgniteCheckedException { - // If read start position is within cached boundaries. - if (contains(pos)) { - byte[] data = readFut.get(); - - int srcPos = (int)(pos - this.pos); - int cpLen = Math.min(len, data.length - srcPos); - - U.arrayCopy(data, srcPos, dst, dstOff, cpLen); - - return cpLen; - } - - return 0; - } - - /** - * @return {@code True} if data is ready to be read. - */ - public boolean ready() { - return readFut.isDone(); - } - - /** - * Checks if current buffer part contains given position. - * - * @param pos Position to check. - * @return {@code True} if position matches buffer region. - */ - public boolean contains(long pos) { - return this.pos <= pos && this.pos + len > pos; - } - } - - private class DoubleFetchBuffer { - /** */ - private FetchBufferPart first; - - /** */ - private FetchBufferPart second; - - /** - * Copies fetched data from both buffers to destination array if cached region matched read position. - * - * @param dst Destination buffer. - * @param pos Read position in file. - * @param dstOff Destination buffer offset. - * @param len Maximum number of bytes to copy. - * @return Number of bytes copied. - * @throws IgniteCheckedException If any read operation failed. - */ - public int flatten(byte[] dst, long pos, int dstOff, int len) throws IgniteCheckedException { - assert dstOff >= 0; - assert dstOff + len <= dst.length : "Invalid indices [dst.length=" + dst.length + ", dstOff=" + dstOff + - ", len=" + len + ']'; - - int bytesCopied = 0; - - if (first != null) { - bytesCopied += first.flatten(dst, pos, dstOff, len); - - if (bytesCopied != len && second != null) { - assert second.pos == first.pos + first.len; - - bytesCopied += second.flatten(dst, pos + bytesCopied, dstOff + bytesCopied, len - bytesCopied); - } - } - - return bytesCopied; - } - - /** - * Gets byte at specified position in buffer. - * - * @param pos Stream position. - * @return Read byte. - * @throws IgniteCheckedException If read failed. - */ - public int atPosition(long pos) throws IgniteCheckedException { - // Should not reach here if stream contains no data. - assert first != null; - - if (first.contains(pos)) { - byte[] bytes = first.readFut.get(); - - return bytes[((int)(pos - first.pos))] & 0xFF; - } - else { - assert second != null; - assert second.contains(pos); - - byte[] bytes = second.readFut.get(); - - return bytes[((int)(pos - second.pos))] & 0xFF; - } - } - - /** - * Starts asynchronous buffer refresh if needed, depending on current position. - * - * @param pos Current stream position. - */ - public void refreshAhead(long pos) { - if (fullPrefetch(pos)) { - first = fetch(pos, bufHalfSize); - second = fetch(pos + bufHalfSize, bufHalfSize); - } - else if (needFlip(pos)) { - first = second; - - second = fetch(first.pos + first.len, bufHalfSize); - } - } - - /** - * @param pos Position from which read is expected. - * @return Number of bytes available to be read without blocking. - */ - public int available(long pos) { - int available = 0; - - if (first != null) { - if (first.contains(pos)) { - if (first.ready()) { - available += (pos - first.pos); - - if (second != null && second.ready()) - available += second.len; - } - } - else { - if (second != null && second.contains(pos) && second.ready()) - available += (pos - second.pos); - } - } - - return available; - } - - /** - * Checks if position shifted enough to forget previous buffer. - * - * @param pos Current position. - * @return {@code True} if need flip buffers. - */ - private boolean needFlip(long pos) { - // Return true if we read more then half of second buffer. - return second != null && second.contains(pos); - } - - /** - * Determines if all cached bytes should be discarded and new region should be - * prefetched. - * - * @param curPos Current stream position. - * @return {@code True} if need to refresh both blocks. - */ - private boolean fullPrefetch(long curPos) { - // If no data was prefetched yet, return true. - return first == null || curPos < first.pos || (second != null && curPos >= second.pos + second.len); - } - - /** - * Starts asynchronous fetch for given region. - * - * @param pos Position to read from. - * @param size Number of bytes to read. - * @return Fetch buffer part. - */ - private FetchBufferPart fetch(long pos, int size) { - long remaining = limit - pos; - - size = (int)Math.min(size, remaining); - - return size <= 0 ? null : - new FetchBufferPart(delegate.hadoop().readData(delegate, pos, size, null, 0, 0), pos, size); - } - } -}