[
https://issues.apache.org/jira/browse/HADOOP-19613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18007998#comment-18007998
]
ASF GitHub Bot commented on HADOOP-19613:
-----------------------------------------
anmolanmol1234 commented on code in PR #7801:
URL: https://github.com/apache/hadoop/pull/7801#discussion_r2215342272
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java:
##########
@@ -15,636 +15,166 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.fs.azurebfs.services;
-import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+package org.apache.hadoop.fs.azurebfs.services;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.LinkedList;
import java.util.List;
-import java.util.Queue;
-import java.util.Stack;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.locks.ReentrantLock;
-import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus;
+import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
/**
- * The Read Buffer Manager for Rest AbfsClient.
+ * Interface for managing read buffers for Azure Blob File System input
streams.
*/
-final class ReadBufferManager {
- private static final Logger LOGGER =
LoggerFactory.getLogger(ReadBufferManager.class);
- private static final int ONE_KB = 1024;
- private static final int ONE_MB = ONE_KB * ONE_KB;
-
- private static final int NUM_BUFFERS = 16;
- private static final int NUM_THREADS = 8;
- private static final int DEFAULT_THRESHOLD_AGE_MILLISECONDS = 3000; // have
to see if 3 seconds is a good threshold
-
- private static int blockSize = 4 * ONE_MB;
- private static int thresholdAgeMilliseconds =
DEFAULT_THRESHOLD_AGE_MILLISECONDS;
- private Thread[] threads = new Thread[NUM_THREADS];
- private byte[][] buffers; // array of byte[] buffers, to hold the data
that is read
- private Stack<Integer> freeList = new Stack<>(); // indices in buffers[]
array that are available
-
- private Queue<ReadBuffer> readAheadQueue = new LinkedList<>(); // queue of
requests that are not picked up by any worker thread yet
- private LinkedList<ReadBuffer> inProgressList = new LinkedList<>(); //
requests being processed by worker threads
- private LinkedList<ReadBuffer> completedReadList = new LinkedList<>(); //
buffers available for reading
- private static ReadBufferManager bufferManager; // singleton, initialized in
static initialization block
- private static final ReentrantLock LOCK = new ReentrantLock();
-
- static ReadBufferManager getBufferManager() {
- if (bufferManager == null) {
- LOCK.lock();
- try {
- if (bufferManager == null) {
- bufferManager = new ReadBufferManager();
- bufferManager.init();
- }
- } finally {
- LOCK.unlock();
- }
- }
- return bufferManager;
- }
-
- static void setReadBufferManagerConfigs(int readAheadBlockSize) {
- if (bufferManager == null) {
- LOGGER.debug(
- "ReadBufferManager not initialized yet. Overriding
readAheadBlockSize as {}",
- readAheadBlockSize);
- blockSize = readAheadBlockSize;
- }
- }
-
- private void init() {
- buffers = new byte[NUM_BUFFERS][];
- for (int i = 0; i < NUM_BUFFERS; i++) {
- buffers[i] = new byte[blockSize]; // same buffers are reused. The byte
array never goes back to GC
- freeList.add(i);
- }
- for (int i = 0; i < NUM_THREADS; i++) {
- Thread t = new Thread(new ReadBufferWorker(i));
- t.setDaemon(true);
- threads[i] = t;
- t.setName("ABFS-prefetch-" + i);
- t.start();
- }
- ReadBufferWorker.UNLEASH_WORKERS.countDown();
- }
-
- // hide instance constructor
- private ReadBufferManager() {
- LOGGER.trace("Creating readbuffer manager with HADOOP-18546 patch");
- }
-
-
- /*
- *
- * AbfsInputStream-facing methods
- *
- */
-
+public interface ReadBufferManager {
/**
- * {@link AbfsInputStream} calls this method to queue read-aheads.
- *
- * @param stream The {@link AbfsInputStream} for which to do the
read-ahead
- * @param requestedOffset The offset in the file which shoukd be read
- * @param requestedLength The length to read
+ * Queues a read-ahead request from {@link AbfsInputStream}
+ * for a given offset in file and given length.
+ * @param stream the input stream requesting the read-ahead
+ * @param requestedOffset the offset in the remote file to start reading
+ * @param requestedLength the number of bytes to read from file
+ * @param tracingContext the tracing context for diagnostics
*/
- void queueReadAhead(final AbfsInputStream stream, final long
requestedOffset, final int requestedLength,
- TracingContext tracingContext) {
- if (LOGGER.isTraceEnabled()) {
- LOGGER.trace("Start Queueing readAhead for {} offset {} length {}",
- stream.getPath(), requestedOffset, requestedLength);
- }
- ReadBuffer buffer;
- synchronized (this) {
- if (isAlreadyQueued(stream, requestedOffset)) {
- return; // already queued, do not queue again
- }
- if (freeList.isEmpty() && !tryEvict()) {
- return; // no buffers available, cannot queue anything
- }
-
- buffer = new ReadBuffer();
- buffer.setStream(stream);
- buffer.setOffset(requestedOffset);
- buffer.setLength(0);
- buffer.setRequestedLength(requestedLength);
- buffer.setStatus(ReadBufferStatus.NOT_AVAILABLE);
- buffer.setLatch(new CountDownLatch(1));
- buffer.setTracingContext(tracingContext);
-
- Integer bufferIndex = freeList.pop(); // will return a value, since we
have checked size > 0 already
-
- buffer.setBuffer(buffers[bufferIndex]);
- buffer.setBufferindex(bufferIndex);
- readAheadQueue.add(buffer);
- notifyAll();
- if (LOGGER.isTraceEnabled()) {
- LOGGER.trace("Done q-ing readAhead for file {} offset {} buffer idx
{}",
- stream.getPath(), requestedOffset, buffer.getBufferindex());
- }
- }
- }
+ void queueReadAhead(final AbfsInputStream stream, final long requestedOffset,
+ final int requestedLength, TracingContext tracingContext);
+ /**
+ * Gets a block of data from the prefetched data by ReadBufferManager.
+ * {@link AbfsInputStream} calls this method to read data
+ * @param stream the input stream requesting the block
+ * @param position the position in the file to read from
+ * @param length the number of bytes to read
+ * @param buffer the buffer to store the read data
+ * @return the number of bytes actually read
+ * @throws IOException if an I/O error occurs
+ */
+ int getBlock(final AbfsInputStream stream,
+ final long position,
+ final int length,
+ final byte[] buffer)
+ throws IOException;
/**
- * {@link AbfsInputStream} calls this method read any bytes already
available in a buffer (thereby saving a
- * remote read). This returns the bytes if the data already exists in
buffer. If there is a buffer that is reading
- * the requested offset, then this method blocks until that read completes.
If the data is queued in a read-ahead
- * but not picked up by a worker thread yet, then it cancels that read-ahead
and reports cache miss. This is because
- * depending on worker thread availability, the read-ahead may take a while
- the calling thread can do it's own
- * read to get the data faster (copmared to the read waiting in queue for an
indeterminate amount of time).
+ * {@link ReadBufferWorker} calls this to get the next buffer to read from
read-ahead queue.
+ * Requested read will be performed by background thread.
*
- * @param stream the file to read bytes for
- * @param position the offset in the file to do a read for
- * @param length the length to read
- * @param buffer the buffer to read data into. Note that the buffer will
be written into from offset 0.
- * @return the number of bytes read
+ * @return the next {@link ReadBuffer} to read
+ * @throws InterruptedException if interrupted while waiting
*/
- int getBlock(final AbfsInputStream stream, final long position, final int
length, final byte[] buffer)
- throws IOException {
- // not synchronized, so have to be careful with locking
- if (LOGGER.isTraceEnabled()) {
- LOGGER.trace("getBlock for file {} position {} thread {}",
- stream.getPath(), position, Thread.currentThread().getName());
- }
-
- waitForProcess(stream, position);
-
- int bytesRead = 0;
- synchronized (this) {
- bytesRead = getBlockFromCompletedQueue(stream, position, length, buffer);
- }
- if (bytesRead > 0) {
- if (LOGGER.isTraceEnabled()) {
- LOGGER.trace("Done read from Cache for {} position {} length {}",
- stream.getPath(), position, bytesRead);
- }
- return bytesRead;
- }
+ ReadBuffer getNextBlockToRead() throws InterruptedException;
- // otherwise, just say we got nothing - calling thread can do its own read
- return 0;
- }
-
- /*
- *
- * Internal methods
- *
+ /**
+ * Marks the specified buffer as done reading and updates its status.
+ * Called by {@link ReadBufferWorker} after reading is complete.
+ * @param buffer the buffer that was read by worker thread
+ * @param result the status of the read operation
+ * @param bytesActuallyRead the number of bytes actually read
*/
-
- private void waitForProcess(final AbfsInputStream stream, final long
position) {
- ReadBuffer readBuf;
- synchronized (this) {
- clearFromReadAheadQueue(stream, position);
- readBuf = getFromList(inProgressList, stream, position);
- }
- if (readBuf != null) { // if in in-progress queue, then block for
it
- try {
- if (LOGGER.isTraceEnabled()) {
- LOGGER.trace("got a relevant read buffer for file {} offset {}
buffer idx {}",
- stream.getPath(), readBuf.getOffset(), readBuf.getBufferindex());
- }
- readBuf.getLatch().await(); // blocking wait on the caller stream's
thread
- // Note on correctness: readBuf gets out of inProgressList only in 1
place: after worker thread
- // is done processing it (in doneReading). There, the latch is set
after removing the buffer from
- // inProgressList. So this latch is safe to be outside the
synchronized block.
- // Putting it in synchronized would result in a deadlock, since this
thread would be holding the lock
- // while waiting, so no one will be able to change any state. If this
becomes more complex in the future,
- // then the latch cane be removed and replaced with wait/notify
whenever inProgressList is touched.
- } catch (InterruptedException ex) {
- Thread.currentThread().interrupt();
- }
- if (LOGGER.isTraceEnabled()) {
- LOGGER.trace("latch done for file {} buffer idx {} length {}",
- stream.getPath(), readBuf.getBufferindex(), readBuf.getLength());
- }
- }
- }
+ void doneReading(final ReadBuffer buffer, final ReadBufferStatus result,
+ final int bytesActuallyRead);
/**
- * If any buffer in the completedlist can be reclaimed then reclaim it and
return the buffer to free list.
- * The objective is to find just one buffer - there is no advantage to
evicting more than one.
+ * Purges all buffers associated with the calling {@link AbfsInputStream}.
*
- * @return whether the eviction succeeeded - i.e., were we able to free up
one buffer
+ * @param stream the input stream whose buffers should be purged
*/
- private synchronized boolean tryEvict() {
- ReadBuffer nodeToEvict = null;
- if (completedReadList.size() <= 0) {
- return false; // there are no evict-able buffers
- }
-
- long currentTimeInMs = currentTimeMillis();
-
- // first, try buffers where all bytes have been consumed (approximated as
first and last bytes consumed)
- for (ReadBuffer buf : completedReadList) {
- if (buf.isFirstByteConsumed() && buf.isLastByteConsumed()) {
- nodeToEvict = buf;
- break;
- }
- }
- if (nodeToEvict != null) {
- return evict(nodeToEvict);
- }
-
- // next, try buffers where any bytes have been consumed (may be a bad
idea? have to experiment and see)
- for (ReadBuffer buf : completedReadList) {
- if (buf.isAnyByteConsumed()) {
- nodeToEvict = buf;
- break;
- }
- }
+ void purgeBuffersForStream(AbfsInputStream stream);
- if (nodeToEvict != null) {
- return evict(nodeToEvict);
- }
-
- // next, try any old nodes that have not been consumed
- // Failed read buffers (with buffer index=-1) that are older than
- // thresholdAge should be cleaned up, but at the same time should not
- // report successful eviction.
- // Queue logic expects that a buffer is freed up for read ahead when
- // eviction is successful, whereas a failed ReadBuffer would have released
- // its buffer when its status was set to READ_FAILED.
- long earliestBirthday = Long.MAX_VALUE;
- ArrayList<ReadBuffer> oldFailedBuffers = new ArrayList<>();
- for (ReadBuffer buf : completedReadList) {
- if ((buf.getBufferindex() != -1)
- && (buf.getTimeStamp() < earliestBirthday)) {
- nodeToEvict = buf;
- earliestBirthday = buf.getTimeStamp();
- } else if ((buf.getBufferindex() == -1)
- && (currentTimeInMs - buf.getTimeStamp()) >
thresholdAgeMilliseconds) {
- oldFailedBuffers.add(buf);
- }
- }
-
- for (ReadBuffer buf : oldFailedBuffers) {
- evict(buf);
- }
-
- if ((currentTimeInMs - earliestBirthday > thresholdAgeMilliseconds) &&
(nodeToEvict != null)) {
- return evict(nodeToEvict);
- }
-
- LOGGER.trace("No buffer eligible for eviction");
- // nothing can be evicted
- return false;
- }
-
- private boolean evict(final ReadBuffer buf) {
- // As failed ReadBuffers (bufferIndx = -1) are saved in completedReadList,
- // avoid adding it to freeList.
- if (buf.getBufferindex() != -1) {
- freeList.push(buf.getBufferindex());
- }
-
- completedReadList.remove(buf);
- buf.setTracingContext(null);
- if (LOGGER.isTraceEnabled()) {
- LOGGER.trace("Evicting buffer idx {}; was used for file {} offset {}
length {}",
- buf.getBufferindex(), buf.getStream().getPath(), buf.getOffset(),
buf.getLength());
- }
- return true;
- }
-
- private boolean isAlreadyQueued(final AbfsInputStream stream, final long
requestedOffset) {
- // returns true if any part of the buffer is already queued
- return (isInList(readAheadQueue, stream, requestedOffset)
- || isInList(inProgressList, stream, requestedOffset)
- || isInList(completedReadList, stream, requestedOffset));
- }
-
- private boolean isInList(final Collection<ReadBuffer> list, final
AbfsInputStream stream, final long requestedOffset) {
- return (getFromList(list, stream, requestedOffset) != null);
- }
-
- private ReadBuffer getFromList(final Collection<ReadBuffer> list, final
AbfsInputStream stream, final long requestedOffset) {
- for (ReadBuffer buffer : list) {
- if (buffer.getStream() == stream) {
- if (buffer.getStatus() == ReadBufferStatus.AVAILABLE
- && requestedOffset >= buffer.getOffset()
- && requestedOffset < buffer.getOffset() + buffer.getLength()) {
- return buffer;
- } else if (requestedOffset >= buffer.getOffset()
- && requestedOffset < buffer.getOffset() +
buffer.getRequestedLength()) {
- return buffer;
- }
- }
- }
- return null;
- }
+ // Following Methods are for testing purposes only and should not be used in
production code.
/**
- * Returns buffers that failed or passed from completed queue.
- * @param stream
- * @param requestedOffset
- * @return
+ * Resets the read buffer manager for testing purposes.
*/
- private ReadBuffer getBufferFromCompletedQueue(final AbfsInputStream stream,
final long requestedOffset) {
- for (ReadBuffer buffer : completedReadList) {
- // Buffer is returned if the requestedOffset is at or above buffer's
- // offset but less than buffer's length or the actual requestedLength
- if ((buffer.getStream() == stream)
- && (requestedOffset >= buffer.getOffset())
- && ((requestedOffset < buffer.getOffset() + buffer.getLength())
- || (requestedOffset < buffer.getOffset() +
buffer.getRequestedLength()))) {
- return buffer;
- }
- }
-
- return null;
- }
-
- private void clearFromReadAheadQueue(final AbfsInputStream stream, final
long requestedOffset) {
- ReadBuffer buffer = getFromList(readAheadQueue, stream, requestedOffset);
- if (buffer != null) {
- readAheadQueue.remove(buffer);
- notifyAll(); // lock is held in calling method
- freeList.push(buffer.getBufferindex());
- }
- }
-
- private int getBlockFromCompletedQueue(final AbfsInputStream stream, final
long position, final int length,
- final byte[] buffer) throws
IOException {
- ReadBuffer buf = getBufferFromCompletedQueue(stream, position);
-
- if (buf == null) {
- return 0;
- }
-
- if (buf.getStatus() == ReadBufferStatus.READ_FAILED) {
- // To prevent new read requests to fail due to old read-ahead attempts,
- // return exception only from buffers that failed within last
thresholdAgeMilliseconds
- if ((currentTimeMillis() - (buf.getTimeStamp()) <
thresholdAgeMilliseconds)) {
- throw buf.getErrException();
- } else {
- return 0;
- }
- }
-
- if ((buf.getStatus() != ReadBufferStatus.AVAILABLE)
- || (position >= buf.getOffset() + buf.getLength())) {
- return 0;
- }
-
- int cursor = (int) (position - buf.getOffset());
- int availableLengthInBuffer = buf.getLength() - cursor;
- int lengthToCopy = Math.min(length, availableLengthInBuffer);
- System.arraycopy(buf.getBuffer(), cursor, buffer, 0, lengthToCopy);
- if (cursor == 0) {
- buf.setFirstByteConsumed(true);
- }
- if (cursor + lengthToCopy == buf.getLength()) {
- buf.setLastByteConsumed(true);
- }
- buf.setAnyByteConsumed(true);
- return lengthToCopy;
- }
+ @VisibleForTesting
+ void testResetReadBufferManager();
Review Comment:
why does the method name in production have test ?
> ABFS: [ReadAheadV2] Refactor ReadBufferManager to isolate new code with the
> current working code
> ------------------------------------------------------------------------------------------------
>
> Key: HADOOP-19613
> URL: https://issues.apache.org/jira/browse/HADOOP-19613
> Project: Hadoop Common
> Issue Type: Sub-task
> Components: fs/azure
> Affects Versions: 3.5.0, 3.4.1
> Reporter: Anuj Modi
> Assignee: Anuj Modi
> Priority: Major
> Labels: pull-request-available
>
> Read Buffer Manager used today was introduced way back and has been stable
> for quite a while.
> Read Buffer Manager to be introduced as part of
> https://issues.apache.org/jira/browse/HADOOP-19596 will introduce many
> changes incrementally over time. While the development goes on and we are
> able to fully stabilise the optimized version we need the current flow to be
> functional and undisturbed.
> This work item is to isolate that from new code by refactoring
> ReadBufferManager class to have 2 different implementations with same public
> interfaces: ReadBufferManagerV1 and ReadBufferManagerV2.
> This will also introduce new configs that can be used to toggle between new
> and old code.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]