[
https://issues.apache.org/jira/browse/HADOOP-19622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18033434#comment-18033434
]
ASF GitHub Bot commented on HADOOP-19622:
-----------------------------------------
anujmodi2021 commented on code in PR #7832:
URL: https://github.com/apache/hadoop/pull/7832#discussion_r2467906251
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java:
##########
@@ -106,123 +166,731 @@ void init() {
executorServiceKeepAliveTimeInMilliSec,
TimeUnit.MILLISECONDS,
new SynchronousQueue<>(),
- namedThreadFactory);
+ workerThreadFactory);
workerPool.allowCoreThreadTimeOut(true);
for (int i = 0; i < minThreadPoolSize; i++) {
- ReadBufferWorker worker = new ReadBufferWorker(i, this);
+ ReadBufferWorker worker = new ReadBufferWorker(i, getBufferManager());
+ workerRefs.add(worker);
workerPool.submit(worker);
}
ReadBufferWorker.UNLEASH_WORKERS.countDown();
+
+ if (isDynamicScalingEnabled) {
+ cpuMonitorThread = Executors.newSingleThreadScheduledExecutor(runnable
-> {
+ Thread t = new Thread(runnable, "ReadAheadV2-CPU-Monitor");
+ t.setDaemon(true);
+ return t;
+ });
+ cpuMonitorThread.scheduleAtFixedRate(this::adjustThreadPool,
+ getCpuMonitoringIntervalInMilliSec(),
getCpuMonitoringIntervalInMilliSec(),
+ TimeUnit.MILLISECONDS);
+ }
+
+ printTraceLog("ReadBufferManagerV2 initialized with {} buffers and {}
worker threads",
+ numberOfActiveBuffers, workerRefs.size());
}
/**
- * {@inheritDoc}
+ * {@link AbfsInputStream} calls this method to queueing read-ahead.
+ * @param stream which read-ahead is requested from.
+ * @param requestedOffset The offset in the file which should be read.
+ * @param requestedLength The length to read.
*/
@Override
- public void queueReadAhead(final AbfsInputStream stream,
- final long requestedOffset,
- final int requestedLength,
- final TracingContext tracingContext) {
- // TODO: To be implemented
+ public void queueReadAhead(final AbfsInputStream stream, final long
requestedOffset,
+ final int requestedLength, TracingContext tracingContext) {
+ printTraceLog("Start Queueing readAhead for file: {}, with eTag: {},
offset: {}, length: {}, triggered by stream: {}",
+ stream.getPath(), stream.getETag(), requestedOffset, requestedLength,
stream.hashCode());
+ ReadBuffer buffer;
+ synchronized (this) {
+ if (isAlreadyQueued(stream.getETag(), requestedOffset)) {
+ // Already queued for this offset, so skip queuing.
+ printTraceLog("Skipping queuing readAhead for file: {}, with eTag: {},
offset: {}, triggered by stream: {} as it is already queued",
+ stream.getPath(), stream.getETag(), requestedOffset,
stream.hashCode());
+ return;
+ }
+ if (isFreeListEmpty() && !tryMemoryUpscale() && !tryEvict()) {
+ // No buffers are available and more buffers cannot be created. Skip
queuing.
+ printTraceLog("Skipping queuing readAhead for file: {}, with eTag: {},
offset: {}, triggered by stream: {} as no buffers are available",
+ stream.getPath(), stream.getETag(), requestedOffset,
stream.hashCode());
+ return;
+ }
+
+ // Create a new ReadBuffer to keep the prefetched data and queue.
+ buffer = new ReadBuffer();
+ buffer.setStream(stream); // To map buffer with stream that requested it
+ buffer.setETag(stream.getETag()); // To map buffer with file it belongs
to
+ buffer.setPath(stream.getPath());
+ buffer.setOffset(requestedOffset);
+ buffer.setLength(0);
+ buffer.setRequestedLength(requestedLength);
+ buffer.setStatus(ReadBufferStatus.NOT_AVAILABLE);
+ buffer.setLatch(new CountDownLatch(1));
+ buffer.setTracingContext(tracingContext);
+
+ if (isFreeListEmpty()) {
+ /*
+ * By now there should be at least one buffer available.
+ * This is to double sure that after upscaling or eviction,
+ * we still have free buffer available. If not, we skip queueing.
+ */
+ return;
+ }
+ Integer bufferIndex = popFromFreeList();
+ buffer.setBuffer(bufferPool[bufferIndex]);
+ buffer.setBufferindex(bufferIndex);
+ getReadAheadQueue().add(buffer);
+ notifyAll();
+ printTraceLog("Done q-ing readAhead for file: {}, with eTag:{}, offset:
{}, buffer idx: {}, triggered by stream: {}",
+ stream.getPath(), stream.getETag(), requestedOffset,
buffer.getBufferindex(), stream.hashCode());
+ }
}
/**
- * {@inheritDoc}
+ * {@link AbfsInputStream} calls this method read any bytes already
available in a buffer (thereby saving a
+ * remote read). This returns the bytes if the data already exists in
buffer. If there is a buffer that is reading
+ * the requested offset, then this method blocks until that read completes.
If the data is queued in a read-ahead
+ * but not picked up by a worker thread yet, then it cancels that read-ahead
and reports cache miss. This is because
+ * depending on worker thread availability, the read-ahead may take a while
- the calling thread can do its own
+ * read to get the data faster (compared to the read waiting in queue for an
indeterminate amount of time).
+ *
+ * @param stream of the file to read bytes for
+ * @param position the offset in the file to do a read for
+ * @param length the length to read
+ * @param buffer the buffer to read data into. Note that the buffer will
be written into from offset 0.
+ * @return the number of bytes read
*/
@Override
- public int getBlock(final AbfsInputStream stream,
- final long position,
- final int length,
- final byte[] buffer) throws IOException {
- // TODO: To be implemented
+ public int getBlock(final AbfsInputStream stream, final long position, final
int length, final byte[] buffer)
+ throws IOException {
+ // not synchronized, so have to be careful with locking
+ printTraceLog("getBlock request for file: {}, with eTag: {}, for position:
{} for length: {} received from stream: {}",
+ stream.getPath(), stream.getETag(), position, length,
stream.hashCode());
+
+ String requestedETag = stream.getETag();
+ boolean isFirstRead = stream.isFirstRead();
+
+ // Wait for any in-progress read to complete.
+ waitForProcess(requestedETag, position, isFirstRead);
+
+ int bytesRead = 0;
+ synchronized (this) {
+ bytesRead = getBlockFromCompletedQueue(requestedETag, position, length,
buffer);
+ }
+ if (bytesRead > 0) {
+ printTraceLog("Done read from Cache for the file with eTag: {},
position: {}, length: {}, requested by stream: {}",
+ requestedETag, position, bytesRead, stream.hashCode());
+ return bytesRead;
+ }
+
+ // otherwise, just say we got nothing - calling thread can do its own read
return 0;
}
/**
- * {@inheritDoc}
+ * {@link ReadBufferWorker} thread calls this to get the next buffer that it
should work on.
+ * @return {@link ReadBuffer}
+ * @throws InterruptedException if thread is interrupted
*/
@Override
public ReadBuffer getNextBlockToRead() throws InterruptedException {
- // TODO: To be implemented
- return null;
+ ReadBuffer buffer = null;
+ synchronized (this) {
+ // Blocking Call to wait for prefetch to be queued.
+ while (getReadAheadQueue().size() == 0) {
+ wait();
+ }
+
+ buffer = getReadAheadQueue().remove();
+ notifyAll();
+ if (buffer == null) {
+ return null;
+ }
+ buffer.setStatus(ReadBufferStatus.READING_IN_PROGRESS);
+ getInProgressList().add(buffer);
+ }
+ printTraceLog("ReadBufferWorker picked file: {}, with eTag: {}, for
offset: {}, queued by stream: {}",
+ buffer.getPath(), buffer.getETag(), buffer.getOffset(),
buffer.getStream().hashCode());
+ return buffer;
}
/**
- * {@inheritDoc}
+ * {@link ReadBufferWorker} thread calls this method to post completion. *
+ * @param buffer the buffer whose read was completed
+ * @param result the {@link ReadBufferStatus} after the read
operation in the worker thread
+ * @param bytesActuallyRead the number of bytes that the worker thread was
actually able to read
*/
@Override
- public void doneReading(final ReadBuffer buffer,
- final ReadBufferStatus result,
+ public void doneReading(final ReadBuffer buffer, final ReadBufferStatus
result,
final int bytesActuallyRead) {
- // TODO: To be implemented
+ printTraceLog("ReadBufferWorker completed prefetch for file: {} with eTag:
{}, for offset: {}, queued by stream: {}, with status: {} and bytes read: {}",
+ buffer.getPath(), buffer.getETag(), buffer.getOffset(),
buffer.getStream().hashCode(), result, bytesActuallyRead);
+ synchronized (this) {
+ // If this buffer has already been purged during
+ // close of InputStream then we don't update the lists.
+ if (getInProgressList().contains(buffer)) {
+ getInProgressList().remove(buffer);
+ if (result == ReadBufferStatus.AVAILABLE && bytesActuallyRead > 0) {
+ // Successful read, so update the buffer status and length
+ buffer.setStatus(ReadBufferStatus.AVAILABLE);
+ buffer.setLength(bytesActuallyRead);
+ } else {
+ // Failed read, reuse buffer for next read, this buffer will be
+ // evicted later based on eviction policy.
+ pushToFreeList(buffer.getBufferindex());
+ }
+ // completed list also contains FAILED read buffers
+ // for sending exception message to clients.
+ buffer.setStatus(result);
+ buffer.setTimeStamp(currentTimeMillis());
+ getCompletedReadList().add(buffer);
+ }
+ }
+
+ //outside the synchronized, since anyone receiving a wake-up from the
latch must see safe-published results
+ buffer.getLatch().countDown(); // wake up waiting threads (if any)
}
/**
- * {@inheritDoc}
+ * Purging the buffers associated with an {@link AbfsInputStream}
+ * from {@link ReadBufferManagerV2} when stream is closed.
+ * @param stream input stream.
*/
- @Override
- public void purgeBuffersForStream(final AbfsInputStream stream) {
- // TODO: To be implemented
+ public synchronized void purgeBuffersForStream(AbfsInputStream stream) {
+ printDebugLog("Purging stale buffers for AbfsInputStream {} ", stream);
+ getReadAheadQueue().removeIf(readBuffer -> readBuffer.getStream() ==
stream);
+ purgeList(stream, getCompletedReadList());
+ }
+
+ private boolean isAlreadyQueued(final String eTag, final long
requestedOffset) {
+ // returns true if any part of the buffer is already queued
+ return (isInList(getReadAheadQueue(), eTag, requestedOffset)
+ || isInList(getInProgressList(), eTag, requestedOffset)
+ || isInList(getCompletedReadList(), eTag, requestedOffset));
+ }
+
+ private boolean isInList(final Collection<ReadBuffer> list, final String
eTag,
+ final long requestedOffset) {
+ return (getFromList(list, eTag, requestedOffset) != null);
+ }
+
+ private ReadBuffer getFromList(final Collection<ReadBuffer> list, final
String eTag,
+ final long requestedOffset) {
+ for (ReadBuffer buffer : list) {
+ if (eTag.equals(buffer.getETag())) {
+ if (buffer.getStatus() == ReadBufferStatus.AVAILABLE
+ && requestedOffset >= buffer.getOffset()
+ && requestedOffset < buffer.getOffset() + buffer.getLength()) {
+ return buffer;
+ } else if (requestedOffset >= buffer.getOffset()
+ && requestedOffset
+ < buffer.getOffset() + buffer.getRequestedLength()) {
+ return buffer;
+ }
+ }
+ }
+ return null;
}
/**
- * {@inheritDoc}
+ * If any buffer in the completed list can be reclaimed then reclaim it and
return the buffer to free list.
+ * The objective is to find just one buffer - there is no advantage to
evicting more than one.
+ * @return whether the eviction succeeded - i.e., were we able to free up
one buffer
*/
- @VisibleForTesting
- @Override
- public int getNumBuffers() {
- return numberOfActiveBuffers;
+ private synchronized boolean tryEvict() {
+ ReadBuffer nodeToEvict = null;
+ if (getCompletedReadList().size() <= 0) {
+ return false; // there are no evict-able buffers
+ }
+
+ long currentTimeInMs = currentTimeMillis();
+
+ // first, try buffers where all bytes have been consumed (approximated as
first and last bytes consumed)
+ for (ReadBuffer buf : getCompletedReadList()) {
+ if (buf.isFullyConsumed()) {
+ nodeToEvict = buf;
+ break;
+ }
+ }
+ if (nodeToEvict != null) {
+ return manualEviction(nodeToEvict);
+ }
+
+ // next, try buffers where any bytes have been consumed (maybe a bad idea?
have to experiment and see)
+ for (ReadBuffer buf : getCompletedReadList()) {
+ if (buf.isAnyByteConsumed()) {
+ nodeToEvict = buf;
+ break;
+ }
+ }
+
+ if (nodeToEvict != null) {
+ return manualEviction(nodeToEvict);
+ }
+
+ // next, try any old nodes that have not been consumed
+ // Failed read buffers (with buffer index=-1) that are older than
+ // thresholdAge should be cleaned up, but at the same time should not
+ // report successful eviction.
+ // Queue logic expects that a buffer is freed up for read ahead when
+ // eviction is successful, whereas a failed ReadBuffer would have released
+ // its buffer when its status was set to READ_FAILED.
+ long earliestBirthday = Long.MAX_VALUE;
+ ArrayList<ReadBuffer> oldFailedBuffers = new ArrayList<>();
+ for (ReadBuffer buf : getCompletedReadList()) {
+ if ((buf.getBufferindex() != -1)
+ && (buf.getTimeStamp() < earliestBirthday)) {
+ nodeToEvict = buf;
+ earliestBirthday = buf.getTimeStamp();
+ } else if ((buf.getBufferindex() == -1)
+ && (currentTimeInMs - buf.getTimeStamp()) >
getThresholdAgeMilliseconds()) {
+ oldFailedBuffers.add(buf);
+ }
+ }
+
+ for (ReadBuffer buf : oldFailedBuffers) {
+ manualEviction(buf);
+ }
+
+ if ((currentTimeInMs - earliestBirthday > getThresholdAgeMilliseconds())
&& (nodeToEvict != null)) {
+ return manualEviction(nodeToEvict);
+ }
+
+ printTraceLog("No buffer eligible for eviction");
+ // nothing can be evicted
+ return false;
+ }
+
+ private boolean evict(final ReadBuffer buf) {
+ if (buf.getRefCount() > 0) {
+ // If the buffer is still being read, then we cannot evict it.
+ printTraceLog(
+ "Cannot evict buffer with index: {}, file: {}, with eTag: {},
offset: {} as it is still being read by some input stream",
+ buf.getBufferindex(), buf.getPath(), buf.getETag(), buf.getOffset());
+ return false;
+ }
+ // As failed ReadBuffers (bufferIndx = -1) are saved in
getCompletedReadList(),
+ // avoid adding it to availableBufferList.
+ if (buf.getBufferindex() != -1) {
+ pushToFreeList(buf.getBufferindex());
+ }
+ getCompletedReadList().remove(buf);
+ buf.setTracingContext(null);
+ printTraceLog(
+ "Eviction of Buffer Completed for BufferIndex: {}, file: {}, with
eTag: {}, offset: {}, is fully consumed: {}, is partially consumed: {}",
+ buf.getBufferindex(), buf.getPath(), buf.getETag(), buf.getOffset(),
+ buf.isFullyConsumed(), buf.isAnyByteConsumed());
+ return true;
+ }
+
+ private void waitForProcess(final String eTag, final long position, boolean
isFirstRead) {
+ ReadBuffer readBuf;
+ synchronized (this) {
+ readBuf = clearFromReadAheadQueue(eTag, position, isFirstRead);
+ if (readBuf == null) {
+ readBuf = getFromList(getInProgressList(), eTag, position);
+ }
+ }
+ if (readBuf != null) { // if in in-progress queue, then block for
it
+ try {
+ printTraceLog("A relevant read buffer for file: {}, with eTag: {},
offset: {}, queued by stream: {}, having buffer idx: {} is being prefetched,
waiting for latch",
+ readBuf.getPath(), readBuf.getETag(), readBuf.getOffset(),
readBuf.getStream().hashCode(), readBuf.getBufferindex());
+ readBuf.getLatch().await(); // blocking wait on the caller stream's
thread
+ // Note on correctness: readBuf gets out of getInProgressList() only
in 1 place: after worker thread
+ // is done processing it (in doneReading). There, the latch is set
after removing the buffer from
+ // getInProgressList(). So this latch is safe to be outside the
synchronized block.
+ // Putting it in synchronized would result in a deadlock, since this
thread would be holding the lock
+ // while waiting, so no one will be able to change any state. If this
becomes more complex in the future,
+ // then the latch cane be removed and replaced with wait/notify
whenever getInProgressList() is touched.
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ }
+ printTraceLog("Latch done for file: {}, with eTag: {}, for offset: {}, "
+ + "buffer index: {} queued by stream: {}", readBuf.getPath(),
readBuf.getETag(),
+ readBuf.getOffset(), readBuf.getBufferindex(),
readBuf.getStream().hashCode());
+ }
+ }
+
+ private ReadBuffer clearFromReadAheadQueue(final String eTag, final long
requestedOffset, boolean isFirstRead) {
+ ReadBuffer buffer = getFromList(getReadAheadQueue(), eTag,
requestedOffset);
+ /*
+ * If this prefetch was triggered by first read of this input stream,
+ * we should not remove it from queue and let it complete by backend
threads.
+ */
+ if (buffer != null && isFirstRead) {
+ return buffer;
+ }
+ if (buffer != null) {
+ getReadAheadQueue().remove(buffer);
+ notifyAll(); // lock is held in calling method
+ pushToFreeList(buffer.getBufferindex());
+ }
+ return null;
}
+
+ private int getBlockFromCompletedQueue(final String eTag, final long
position,
+ final int length, final byte[] buffer) throws IOException {
+ ReadBuffer buf = getBufferFromCompletedQueue(eTag, position);
+
+ if (buf == null) {
+ return 0;
+ }
+
+ buf.startReading(); // atomic increment of refCount.
+
+ if (buf.getStatus() == ReadBufferStatus.READ_FAILED) {
+ // To prevent new read requests to fail due to old read-ahead attempts,
+ // return exception only from buffers that failed within last
getThresholdAgeMilliseconds()
+ if ((currentTimeMillis() - (buf.getTimeStamp()) <
getThresholdAgeMilliseconds())) {
+ throw buf.getErrException();
+ } else {
+ return 0;
+ }
+ }
+
+ if ((buf.getStatus() != ReadBufferStatus.AVAILABLE)
+ || (position >= buf.getOffset() + buf.getLength())) {
+ return 0;
+ }
+
+ int cursor = (int) (position - buf.getOffset());
+ int availableLengthInBuffer = buf.getLength() - cursor;
+ int lengthToCopy = Math.min(length, availableLengthInBuffer);
+ System.arraycopy(buf.getBuffer(), cursor, buffer, 0, lengthToCopy);
+ if (cursor == 0) {
+ buf.setFirstByteConsumed(true);
+ }
+ if (cursor + lengthToCopy == buf.getLength()) {
+ buf.setLastByteConsumed(true);
+ }
+ buf.setAnyByteConsumed(true);
+
+ buf.endReading(); // atomic decrement of refCount
+ return lengthToCopy;
+ }
+
+ private ReadBuffer getBufferFromCompletedQueue(final String eTag, final long
requestedOffset) {
+ for (ReadBuffer buffer : getCompletedReadList()) {
+ // Buffer is returned if the requestedOffset is at or above buffer's
+ // offset but less than buffer's length or the actual requestedLength
+ if (eTag.equals(buffer.getETag())
+ && (requestedOffset >= buffer.getOffset())
+ && ((requestedOffset < buffer.getOffset() + buffer.getLength())
+ || (requestedOffset < buffer.getOffset() +
buffer.getRequestedLength()))) {
+ return buffer;
+ }
+ }
+ return null;
+ }
+
+ private synchronized boolean tryMemoryUpscale() {
+ if (!isDynamicScalingEnabled) {
+ printTraceLog("Dynamic scaling is disabled, skipping memory upscale");
+ return false; // Dynamic scaling is disabled, so no upscaling.
+ }
+ double memoryLoad = getMemoryLoad();
+ if (memoryLoad < memoryThreshold && getNumBuffers() < maxBufferPoolSize) {
+ // Create and Add more buffers in getFreeList().
+ if (removedBufferList.isEmpty()) {
+ bufferPool[getNumBuffers()] = new byte[getReadAheadBlockSize()];
+ pushToFreeList(getNumBuffers());
+ } else {
+ // Reuse a removed buffer index.
+ int freeIndex = removedBufferList.pop();
+ if (freeIndex >= bufferPool.length) {
+ printTraceLog("Invalid free index: {}. Current buffer pool size: {}",
+ freeIndex, bufferPool.length);
+ return false;
+ }
+ bufferPool[freeIndex] = new byte[getReadAheadBlockSize()];
+ pushToFreeList(freeIndex);
+ }
+ incrementActiveBufferCount();
+ printTraceLog("Current Memory Load: {}. Incrementing buffer pool size to
{}", memoryLoad, getNumBuffers());
+ return true;
+ }
+ printTraceLog("Could not Upscale memory. Total buffers: {} Memory Load:
{}",
+ getNumBuffers(), memoryLoad);
+ return false;
+ }
+
+ private void scheduledEviction() {
+ for (ReadBuffer buf : getCompletedReadList()) {
+ if (currentTimeMillis() - buf.getTimeStamp() >
getThresholdAgeMilliseconds()) {
+ // If the buffer is older than thresholdAge, evict it.
+ printTraceLog("Scheduled Eviction of Buffer Triggered for BufferIndex:
{}, file: {}, with eTag: {}, offset: {}, length: {}, queued by stream: {}",
+ buf.getBufferindex(), buf.getPath(), buf.getETag(),
buf.getOffset(), buf.getLength(), buf.getStream().hashCode());
+ evict(buf);
+ }
+ }
+
+ double memoryLoad = getMemoryLoad();
+ if (isDynamicScalingEnabled && memoryLoad > memoryThreshold) {
+ synchronized (this) {
+ if (isFreeListEmpty()) {
+ printTraceLog("No free buffers available. Skipping downscale of
buffer pool");
+ return; // No free buffers available, so cannot downscale.
+ }
+ int freeIndex = popFromFreeList();
+ bufferPool[freeIndex] = null;
+ removedBufferList.add(freeIndex);
+ decrementActiveBufferCount();
+ printTraceLog("Current Memory Load: {}. Decrementing buffer pool size
to {}", memoryLoad, getNumBuffers());
+ }
+ }
+ }
+
+ private boolean manualEviction(final ReadBuffer buf) {
+ printTraceLog("Manual Eviction of Buffer Triggered for BufferIndex: {},
file: {}, with eTag: {}, offset: {}, queued by stream: {}",
+ buf.getBufferindex(), buf.getPath(), buf.getETag(), buf.getOffset(),
buf.getStream().hashCode());
+ return evict(buf);
+ }
+
+ private void adjustThreadPool() {
Review Comment:
We only upscale when we have enough queued requests and we only queue if we
have enough memory
> ABFS: [ReadAheadV2] Implement Read Buffer Manager V2 with improved
> aggressiveness
> ---------------------------------------------------------------------------------
>
> Key: HADOOP-19622
> URL: https://issues.apache.org/jira/browse/HADOOP-19622
> Project: Hadoop Common
> Issue Type: Sub-task
> Components: fs/azure
> Affects Versions: 3.5.0, 3.4.1
> Reporter: Anuj Modi
> Assignee: Anuj Modi
> Priority: Major
> Labels: pull-request-available
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]