[ 
https://issues.apache.org/jira/browse/HADOOP-19622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18033296#comment-18033296
 ] 

ASF GitHub Bot commented on HADOOP-19622:
-----------------------------------------

manika137 commented on code in PR #7832:
URL: https://github.com/apache/hadoop/pull/7832#discussion_r2466464367


##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java:
##########
@@ -106,123 +166,731 @@ void init() {
         executorServiceKeepAliveTimeInMilliSec,
         TimeUnit.MILLISECONDS,
         new SynchronousQueue<>(),
-        namedThreadFactory);
+        workerThreadFactory);
     workerPool.allowCoreThreadTimeOut(true);
     for (int i = 0; i < minThreadPoolSize; i++) {
-      ReadBufferWorker worker = new ReadBufferWorker(i, this);
+      ReadBufferWorker worker = new ReadBufferWorker(i, getBufferManager());
+      workerRefs.add(worker);
       workerPool.submit(worker);
     }
     ReadBufferWorker.UNLEASH_WORKERS.countDown();
+
+    if (isDynamicScalingEnabled) {
+      cpuMonitorThread = Executors.newSingleThreadScheduledExecutor(runnable 
-> {
+        Thread t = new Thread(runnable, "ReadAheadV2-CPU-Monitor");
+        t.setDaemon(true);
+        return t;
+      });
+      cpuMonitorThread.scheduleAtFixedRate(this::adjustThreadPool,
+          getCpuMonitoringIntervalInMilliSec(), 
getCpuMonitoringIntervalInMilliSec(),
+          TimeUnit.MILLISECONDS);
+    }
+
+    printTraceLog("ReadBufferManagerV2 initialized with {} buffers and {} 
worker threads",
+        numberOfActiveBuffers, workerRefs.size());
   }
 
   /**
-   * {@inheritDoc}
+   * {@link AbfsInputStream} calls this method to queueing read-ahead.
+   * @param stream which read-ahead is requested from.
+   * @param requestedOffset The offset in the file which should be read.
+   * @param requestedLength The length to read.
    */
   @Override
-  public void queueReadAhead(final AbfsInputStream stream,
-      final long requestedOffset,
-      final int requestedLength,
-      final TracingContext tracingContext) {
-    // TODO: To be implemented
+  public void queueReadAhead(final AbfsInputStream stream, final long 
requestedOffset,
+      final int requestedLength, TracingContext tracingContext) {
+    printTraceLog("Start Queueing readAhead for file: {}, with eTag: {}, 
offset: {}, length: {}, triggered by stream: {}",
+        stream.getPath(), stream.getETag(), requestedOffset, requestedLength, 
stream.hashCode());
+    ReadBuffer buffer;
+    synchronized (this) {
+      if (isAlreadyQueued(stream.getETag(), requestedOffset)) {
+        // Already queued for this offset, so skip queuing.
+        printTraceLog("Skipping queuing readAhead for file: {}, with eTag: {}, 
offset: {}, triggered by stream: {} as it is already queued",
+            stream.getPath(), stream.getETag(), requestedOffset, 
stream.hashCode());
+        return;
+      }
+      if (isFreeListEmpty() && !tryMemoryUpscale() && !tryEvict()) {
+        // No buffers are available and more buffers cannot be created. Skip 
queuing.
+        printTraceLog("Skipping queuing readAhead for file: {}, with eTag: {}, 
offset: {}, triggered by stream: {} as no buffers are available",
+            stream.getPath(), stream.getETag(), requestedOffset, 
stream.hashCode());
+        return;
+      }
+
+      // Create a new ReadBuffer to keep the prefetched data and queue.
+      buffer = new ReadBuffer();
+      buffer.setStream(stream); // To map buffer with stream that requested it
+      buffer.setETag(stream.getETag()); // To map buffer with file it belongs 
to
+      buffer.setPath(stream.getPath());
+      buffer.setOffset(requestedOffset);
+      buffer.setLength(0);
+      buffer.setRequestedLength(requestedLength);
+      buffer.setStatus(ReadBufferStatus.NOT_AVAILABLE);
+      buffer.setLatch(new CountDownLatch(1));
+      buffer.setTracingContext(tracingContext);
+
+      if (isFreeListEmpty()) {
+        /*
+         * By now there should be at least one buffer available.
+         * This is to double sure that after upscaling or eviction,
+         * we still have free buffer available. If not, we skip queueing.
+         */
+        return;
+      }
+      Integer bufferIndex = popFromFreeList();
+      buffer.setBuffer(bufferPool[bufferIndex]);
+      buffer.setBufferindex(bufferIndex);
+      getReadAheadQueue().add(buffer);
+      notifyAll();
+      printTraceLog("Done q-ing readAhead for file: {}, with eTag:{}, offset: 
{}, buffer idx: {}, triggered by stream: {}",
+          stream.getPath(), stream.getETag(), requestedOffset, 
buffer.getBufferindex(), stream.hashCode());
+    }
   }
 
   /**
-   * {@inheritDoc}
+   * {@link AbfsInputStream} calls this method read any bytes already 
available in a buffer (thereby saving a
+   * remote read). This returns the bytes if the data already exists in 
buffer. If there is a buffer that is reading
+   * the requested offset, then this method blocks until that read completes. 
If the data is queued in a read-ahead
+   * but not picked up by a worker thread yet, then it cancels that read-ahead 
and reports cache miss. This is because
+   * depending on worker thread availability, the read-ahead may take a while 
- the calling thread can do its own
+   * read to get the data faster (compared to the read waiting in queue for an 
indeterminate amount of time).
+   *
+   * @param stream of the file to read bytes for
+   * @param position the offset in the file to do a read for
+   * @param length   the length to read
+   * @param buffer   the buffer to read data into. Note that the buffer will 
be written into from offset 0.
+   * @return the number of bytes read
    */
   @Override
-  public int getBlock(final AbfsInputStream stream,
-      final long position,
-      final int length,
-      final byte[] buffer) throws IOException {
-    // TODO: To be implemented
+  public int getBlock(final AbfsInputStream stream, final long position, final 
int length, final byte[] buffer)
+      throws IOException {
+    // not synchronized, so have to be careful with locking
+    printTraceLog("getBlock request for file: {}, with eTag: {}, for position: 
{} for length: {} received from stream: {}",
+        stream.getPath(), stream.getETag(), position, length, 
stream.hashCode());
+
+    String requestedETag = stream.getETag();
+    boolean isFirstRead = stream.isFirstRead();
+
+    // Wait for any in-progress read to complete.
+    waitForProcess(requestedETag, position, isFirstRead);
+
+    int bytesRead = 0;
+    synchronized (this) {
+      bytesRead = getBlockFromCompletedQueue(requestedETag, position, length, 
buffer);
+    }
+    if (bytesRead > 0) {
+      printTraceLog("Done read from Cache for the file with eTag: {}, 
position: {}, length: {}, requested by stream: {}",
+          requestedETag, position, bytesRead, stream.hashCode());
+      return bytesRead;
+    }
+
+    // otherwise, just say we got nothing - calling thread can do its own read
     return 0;
   }
 
   /**
-   * {@inheritDoc}
+   * {@link ReadBufferWorker} thread calls this to get the next buffer that it 
should work on.
+   * @return {@link ReadBuffer}
+   * @throws InterruptedException if thread is interrupted
    */
   @Override
   public ReadBuffer getNextBlockToRead() throws InterruptedException {
-    // TODO: To be implemented
-    return null;
+    ReadBuffer buffer = null;
+    synchronized (this) {
+      // Blocking Call to wait for prefetch to be queued.
+      while (getReadAheadQueue().size() == 0) {
+        wait();
+      }
+
+      buffer = getReadAheadQueue().remove();
+      notifyAll();
+      if (buffer == null) {
+        return null;
+      }
+      buffer.setStatus(ReadBufferStatus.READING_IN_PROGRESS);
+      getInProgressList().add(buffer);
+    }
+    printTraceLog("ReadBufferWorker picked file: {}, with eTag: {}, for 
offset: {}, queued by stream: {}",
+        buffer.getPath(), buffer.getETag(), buffer.getOffset(), 
buffer.getStream().hashCode());
+    return buffer;
   }
 
   /**
-   * {@inheritDoc}
+   * {@link ReadBufferWorker} thread calls this method to post completion.   *
+   * @param buffer            the buffer whose read was completed
+   * @param result            the {@link ReadBufferStatus} after the read 
operation in the worker thread
+   * @param bytesActuallyRead the number of bytes that the worker thread was 
actually able to read
    */
   @Override
-  public void doneReading(final ReadBuffer buffer,
-      final ReadBufferStatus result,
+  public void doneReading(final ReadBuffer buffer, final ReadBufferStatus 
result,
       final int bytesActuallyRead) {
-    // TODO: To be implemented
+    printTraceLog("ReadBufferWorker completed prefetch for file: {} with eTag: 
{}, for offset: {}, queued by stream: {}, with status: {} and bytes read: {}",
+        buffer.getPath(), buffer.getETag(), buffer.getOffset(), 
buffer.getStream().hashCode(), result, bytesActuallyRead);
+    synchronized (this) {
+      // If this buffer has already been purged during
+      // close of InputStream then we don't update the lists.
+      if (getInProgressList().contains(buffer)) {
+        getInProgressList().remove(buffer);
+        if (result == ReadBufferStatus.AVAILABLE && bytesActuallyRead > 0) {
+          // Successful read, so update the buffer status and length
+          buffer.setStatus(ReadBufferStatus.AVAILABLE);
+          buffer.setLength(bytesActuallyRead);
+        } else {
+          // Failed read, reuse buffer for next read, this buffer will be
+          // evicted later based on eviction policy.
+          pushToFreeList(buffer.getBufferindex());
+        }
+        // completed list also contains FAILED read buffers
+        // for sending exception message to clients.
+        buffer.setStatus(result);
+        buffer.setTimeStamp(currentTimeMillis());
+        getCompletedReadList().add(buffer);
+      }
+    }
+
+    //outside the synchronized, since anyone receiving a wake-up from the 
latch must see safe-published results
+    buffer.getLatch().countDown(); // wake up waiting threads (if any)
   }
 
   /**
-   * {@inheritDoc}
+   * Purging the buffers associated with an {@link AbfsInputStream}
+   * from {@link ReadBufferManagerV2} when stream is closed.
+   * @param stream input stream.
    */
-  @Override
-  public void purgeBuffersForStream(final AbfsInputStream stream) {
-    // TODO: To be implemented
+  public synchronized void purgeBuffersForStream(AbfsInputStream stream) {
+    printDebugLog("Purging stale buffers for AbfsInputStream {} ", stream);
+    getReadAheadQueue().removeIf(readBuffer -> readBuffer.getStream() == 
stream);
+    purgeList(stream, getCompletedReadList());
+  }
+
+  private boolean isAlreadyQueued(final String eTag, final long 
requestedOffset) {
+    // returns true if any part of the buffer is already queued
+    return (isInList(getReadAheadQueue(), eTag, requestedOffset)
+        || isInList(getInProgressList(), eTag, requestedOffset)
+        || isInList(getCompletedReadList(), eTag, requestedOffset));
+  }
+
+  private boolean isInList(final Collection<ReadBuffer> list, final String 
eTag,
+      final long requestedOffset) {
+    return (getFromList(list, eTag, requestedOffset) != null);
+  }
+
+  private ReadBuffer getFromList(final Collection<ReadBuffer> list, final 
String eTag,
+      final long requestedOffset) {
+    for (ReadBuffer buffer : list) {
+      if (eTag.equals(buffer.getETag())) {
+        if (buffer.getStatus() == ReadBufferStatus.AVAILABLE
+            && requestedOffset >= buffer.getOffset()
+            && requestedOffset < buffer.getOffset() + buffer.getLength()) {
+          return buffer;
+        } else if (requestedOffset >= buffer.getOffset()
+            && requestedOffset
+            < buffer.getOffset() + buffer.getRequestedLength()) {
+          return buffer;
+        }
+      }
+    }
+    return null;
   }
 
   /**
-   * {@inheritDoc}
+   * If any buffer in the completed list can be reclaimed then reclaim it and 
return the buffer to free list.
+   * The objective is to find just one buffer - there is no advantage to 
evicting more than one.
+   * @return whether the eviction succeeded - i.e., were we able to free up 
one buffer
    */
-  @VisibleForTesting
-  @Override
-  public int getNumBuffers() {
-    return numberOfActiveBuffers;
+  private synchronized boolean tryEvict() {
+    ReadBuffer nodeToEvict = null;
+    if (getCompletedReadList().size() <= 0) {
+      return false;  // there are no evict-able buffers
+    }
+
+    long currentTimeInMs = currentTimeMillis();
+
+    // first, try buffers where all bytes have been consumed (approximated as 
first and last bytes consumed)
+    for (ReadBuffer buf : getCompletedReadList()) {
+      if (buf.isFullyConsumed()) {
+        nodeToEvict = buf;
+        break;
+      }
+    }
+    if (nodeToEvict != null) {
+      return manualEviction(nodeToEvict);
+    }
+
+    // next, try buffers where any bytes have been consumed (maybe a bad idea? 
have to experiment and see)
+    for (ReadBuffer buf : getCompletedReadList()) {
+      if (buf.isAnyByteConsumed()) {
+        nodeToEvict = buf;
+        break;
+      }
+    }
+
+    if (nodeToEvict != null) {
+      return manualEviction(nodeToEvict);
+    }
+
+    // next, try any old nodes that have not been consumed
+    // Failed read buffers (with buffer index=-1) that are older than
+    // thresholdAge should be cleaned up, but at the same time should not
+    // report successful eviction.
+    // Queue logic expects that a buffer is freed up for read ahead when
+    // eviction is successful, whereas a failed ReadBuffer would have released
+    // its buffer when its status was set to READ_FAILED.
+    long earliestBirthday = Long.MAX_VALUE;
+    ArrayList<ReadBuffer> oldFailedBuffers = new ArrayList<>();
+    for (ReadBuffer buf : getCompletedReadList()) {
+      if ((buf.getBufferindex() != -1)
+          && (buf.getTimeStamp() < earliestBirthday)) {
+        nodeToEvict = buf;
+        earliestBirthday = buf.getTimeStamp();
+      } else if ((buf.getBufferindex() == -1)
+          && (currentTimeInMs - buf.getTimeStamp()) > 
getThresholdAgeMilliseconds()) {
+        oldFailedBuffers.add(buf);
+      }
+    }
+
+    for (ReadBuffer buf : oldFailedBuffers) {
+      manualEviction(buf);
+    }
+
+    if ((currentTimeInMs - earliestBirthday > getThresholdAgeMilliseconds()) 
&& (nodeToEvict != null)) {
+      return manualEviction(nodeToEvict);
+    }
+
+    printTraceLog("No buffer eligible for eviction");
+    // nothing can be evicted
+    return false;
+  }
+
+  private boolean evict(final ReadBuffer buf) {
+    if (buf.getRefCount() > 0) {
+      // If the buffer is still being read, then we cannot evict it.
+      printTraceLog(
+          "Cannot evict buffer with index: {}, file: {}, with eTag: {}, 
offset: {} as it is still being read by some input stream",
+          buf.getBufferindex(), buf.getPath(), buf.getETag(), buf.getOffset());
+      return false;
+    }
+    // As failed ReadBuffers (bufferIndx = -1) are saved in 
getCompletedReadList(),
+    // avoid adding it to availableBufferList.
+    if (buf.getBufferindex() != -1) {
+      pushToFreeList(buf.getBufferindex());
+    }
+    getCompletedReadList().remove(buf);
+    buf.setTracingContext(null);
+    printTraceLog(
+        "Eviction of Buffer Completed for BufferIndex: {}, file: {}, with 
eTag: {}, offset: {}, is fully consumed: {}, is partially consumed: {}",
+        buf.getBufferindex(), buf.getPath(), buf.getETag(), buf.getOffset(),
+        buf.isFullyConsumed(), buf.isAnyByteConsumed());
+    return true;
+  }
+
+  private void waitForProcess(final String eTag, final long position, boolean 
isFirstRead) {
+    ReadBuffer readBuf;
+    synchronized (this) {
+      readBuf = clearFromReadAheadQueue(eTag, position, isFirstRead);
+      if (readBuf == null) {
+        readBuf = getFromList(getInProgressList(), eTag, position);
+      }
+    }
+    if (readBuf != null) {         // if in in-progress queue, then block for 
it
+      try {
+        printTraceLog("A relevant read buffer for file: {}, with eTag: {}, 
offset: {}, queued by stream: {}, having buffer idx: {} is being prefetched, 
waiting for latch",
+            readBuf.getPath(), readBuf.getETag(), readBuf.getOffset(), 
readBuf.getStream().hashCode(), readBuf.getBufferindex());
+        readBuf.getLatch().await();  // blocking wait on the caller stream's 
thread
+        // Note on correctness: readBuf gets out of getInProgressList() only 
in 1 place: after worker thread
+        // is done processing it (in doneReading). There, the latch is set 
after removing the buffer from
+        // getInProgressList(). So this latch is safe to be outside the 
synchronized block.
+        // Putting it in synchronized would result in a deadlock, since this 
thread would be holding the lock
+        // while waiting, so no one will be able to  change any state. If this 
becomes more complex in the future,
+        // then the latch cane be removed and replaced with wait/notify 
whenever getInProgressList() is touched.

Review Comment:
   Nit: can spelling





> ABFS: [ReadAheadV2] Implement Read Buffer Manager V2 with improved 
> aggressiveness
> ---------------------------------------------------------------------------------
>
>                 Key: HADOOP-19622
>                 URL: https://issues.apache.org/jira/browse/HADOOP-19622
>             Project: Hadoop Common
>          Issue Type: Sub-task
>          Components: fs/azure
>    Affects Versions: 3.5.0, 3.4.1
>            Reporter: Anuj Modi
>            Assignee: Anuj Modi
>            Priority: Major
>              Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to