[ 
https://issues.apache.org/jira/browse/HADOOP-19622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18033438#comment-18033438
 ] 

ASF GitHub Bot commented on HADOOP-19622:
-----------------------------------------

anujmodi2021 commented on code in PR #7832:
URL: https://github.com/apache/hadoop/pull/7832#discussion_r2467911950


##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java:
##########
@@ -106,123 +166,731 @@ void init() {
         executorServiceKeepAliveTimeInMilliSec,
         TimeUnit.MILLISECONDS,
         new SynchronousQueue<>(),
-        namedThreadFactory);
+        workerThreadFactory);
     workerPool.allowCoreThreadTimeOut(true);
     for (int i = 0; i < minThreadPoolSize; i++) {
-      ReadBufferWorker worker = new ReadBufferWorker(i, this);
+      ReadBufferWorker worker = new ReadBufferWorker(i, getBufferManager());
+      workerRefs.add(worker);
       workerPool.submit(worker);
     }
     ReadBufferWorker.UNLEASH_WORKERS.countDown();
+
+    if (isDynamicScalingEnabled) {
+      cpuMonitorThread = Executors.newSingleThreadScheduledExecutor(runnable 
-> {
+        Thread t = new Thread(runnable, "ReadAheadV2-CPU-Monitor");
+        t.setDaemon(true);
+        return t;
+      });
+      cpuMonitorThread.scheduleAtFixedRate(this::adjustThreadPool,
+          getCpuMonitoringIntervalInMilliSec(), 
getCpuMonitoringIntervalInMilliSec(),
+          TimeUnit.MILLISECONDS);
+    }
+
+    printTraceLog("ReadBufferManagerV2 initialized with {} buffers and {} 
worker threads",
+        numberOfActiveBuffers, workerRefs.size());
   }
 
   /**
-   * {@inheritDoc}
+   * {@link AbfsInputStream} calls this method to queueing read-ahead.
+   * @param stream which read-ahead is requested from.
+   * @param requestedOffset The offset in the file which should be read.
+   * @param requestedLength The length to read.
    */
   @Override
-  public void queueReadAhead(final AbfsInputStream stream,
-      final long requestedOffset,
-      final int requestedLength,
-      final TracingContext tracingContext) {
-    // TODO: To be implemented
+  public void queueReadAhead(final AbfsInputStream stream, final long 
requestedOffset,
+      final int requestedLength, TracingContext tracingContext) {
+    printTraceLog("Start Queueing readAhead for file: {}, with eTag: {}, 
offset: {}, length: {}, triggered by stream: {}",
+        stream.getPath(), stream.getETag(), requestedOffset, requestedLength, 
stream.hashCode());
+    ReadBuffer buffer;
+    synchronized (this) {
+      if (isAlreadyQueued(stream.getETag(), requestedOffset)) {
+        // Already queued for this offset, so skip queuing.
+        printTraceLog("Skipping queuing readAhead for file: {}, with eTag: {}, 
offset: {}, triggered by stream: {} as it is already queued",
+            stream.getPath(), stream.getETag(), requestedOffset, 
stream.hashCode());
+        return;
+      }
+      if (isFreeListEmpty() && !tryMemoryUpscale() && !tryEvict()) {
+        // No buffers are available and more buffers cannot be created. Skip 
queuing.
+        printTraceLog("Skipping queuing readAhead for file: {}, with eTag: {}, 
offset: {}, triggered by stream: {} as no buffers are available",
+            stream.getPath(), stream.getETag(), requestedOffset, 
stream.hashCode());
+        return;
+      }
+
+      // Create a new ReadBuffer to keep the prefetched data and queue.
+      buffer = new ReadBuffer();
+      buffer.setStream(stream); // To map buffer with stream that requested it
+      buffer.setETag(stream.getETag()); // To map buffer with file it belongs 
to
+      buffer.setPath(stream.getPath());
+      buffer.setOffset(requestedOffset);
+      buffer.setLength(0);
+      buffer.setRequestedLength(requestedLength);
+      buffer.setStatus(ReadBufferStatus.NOT_AVAILABLE);
+      buffer.setLatch(new CountDownLatch(1));
+      buffer.setTracingContext(tracingContext);
+
+      if (isFreeListEmpty()) {
+        /*
+         * By now there should be at least one buffer available.
+         * This is to double sure that after upscaling or eviction,
+         * we still have free buffer available. If not, we skip queueing.
+         */
+        return;
+      }
+      Integer bufferIndex = popFromFreeList();
+      buffer.setBuffer(bufferPool[bufferIndex]);
+      buffer.setBufferindex(bufferIndex);
+      getReadAheadQueue().add(buffer);
+      notifyAll();
+      printTraceLog("Done q-ing readAhead for file: {}, with eTag:{}, offset: 
{}, buffer idx: {}, triggered by stream: {}",
+          stream.getPath(), stream.getETag(), requestedOffset, 
buffer.getBufferindex(), stream.hashCode());
+    }
   }
 
   /**
-   * {@inheritDoc}
+   * {@link AbfsInputStream} calls this method read any bytes already 
available in a buffer (thereby saving a
+   * remote read). This returns the bytes if the data already exists in 
buffer. If there is a buffer that is reading
+   * the requested offset, then this method blocks until that read completes. 
If the data is queued in a read-ahead
+   * but not picked up by a worker thread yet, then it cancels that read-ahead 
and reports cache miss. This is because
+   * depending on worker thread availability, the read-ahead may take a while 
- the calling thread can do its own
+   * read to get the data faster (compared to the read waiting in queue for an 
indeterminate amount of time).
+   *
+   * @param stream of the file to read bytes for
+   * @param position the offset in the file to do a read for
+   * @param length   the length to read
+   * @param buffer   the buffer to read data into. Note that the buffer will 
be written into from offset 0.
+   * @return the number of bytes read
    */
   @Override
-  public int getBlock(final AbfsInputStream stream,
-      final long position,
-      final int length,
-      final byte[] buffer) throws IOException {
-    // TODO: To be implemented
+  public int getBlock(final AbfsInputStream stream, final long position, final 
int length, final byte[] buffer)
+      throws IOException {
+    // not synchronized, so have to be careful with locking
+    printTraceLog("getBlock request for file: {}, with eTag: {}, for position: 
{} for length: {} received from stream: {}",
+        stream.getPath(), stream.getETag(), position, length, 
stream.hashCode());
+
+    String requestedETag = stream.getETag();
+    boolean isFirstRead = stream.isFirstRead();
+
+    // Wait for any in-progress read to complete.
+    waitForProcess(requestedETag, position, isFirstRead);
+
+    int bytesRead = 0;
+    synchronized (this) {
+      bytesRead = getBlockFromCompletedQueue(requestedETag, position, length, 
buffer);
+    }
+    if (bytesRead > 0) {
+      printTraceLog("Done read from Cache for the file with eTag: {}, 
position: {}, length: {}, requested by stream: {}",
+          requestedETag, position, bytesRead, stream.hashCode());
+      return bytesRead;
+    }
+
+    // otherwise, just say we got nothing - calling thread can do its own read
     return 0;
   }
 
   /**
-   * {@inheritDoc}
+   * {@link ReadBufferWorker} thread calls this to get the next buffer that it 
should work on.
+   * @return {@link ReadBuffer}
+   * @throws InterruptedException if thread is interrupted
    */
   @Override
   public ReadBuffer getNextBlockToRead() throws InterruptedException {
-    // TODO: To be implemented
-    return null;
+    ReadBuffer buffer = null;
+    synchronized (this) {
+      // Blocking Call to wait for prefetch to be queued.
+      while (getReadAheadQueue().size() == 0) {
+        wait();
+      }
+
+      buffer = getReadAheadQueue().remove();
+      notifyAll();
+      if (buffer == null) {
+        return null;
+      }
+      buffer.setStatus(ReadBufferStatus.READING_IN_PROGRESS);
+      getInProgressList().add(buffer);
+    }
+    printTraceLog("ReadBufferWorker picked file: {}, with eTag: {}, for 
offset: {}, queued by stream: {}",
+        buffer.getPath(), buffer.getETag(), buffer.getOffset(), 
buffer.getStream().hashCode());
+    return buffer;
   }
 
   /**
-   * {@inheritDoc}
+   * {@link ReadBufferWorker} thread calls this method to post completion.   *
+   * @param buffer            the buffer whose read was completed
+   * @param result            the {@link ReadBufferStatus} after the read 
operation in the worker thread
+   * @param bytesActuallyRead the number of bytes that the worker thread was 
actually able to read
    */
   @Override
-  public void doneReading(final ReadBuffer buffer,
-      final ReadBufferStatus result,
+  public void doneReading(final ReadBuffer buffer, final ReadBufferStatus 
result,
       final int bytesActuallyRead) {
-    // TODO: To be implemented
+    printTraceLog("ReadBufferWorker completed prefetch for file: {} with eTag: 
{}, for offset: {}, queued by stream: {}, with status: {} and bytes read: {}",
+        buffer.getPath(), buffer.getETag(), buffer.getOffset(), 
buffer.getStream().hashCode(), result, bytesActuallyRead);
+    synchronized (this) {
+      // If this buffer has already been purged during
+      // close of InputStream then we don't update the lists.
+      if (getInProgressList().contains(buffer)) {
+        getInProgressList().remove(buffer);
+        if (result == ReadBufferStatus.AVAILABLE && bytesActuallyRead > 0) {
+          // Successful read, so update the buffer status and length
+          buffer.setStatus(ReadBufferStatus.AVAILABLE);
+          buffer.setLength(bytesActuallyRead);
+        } else {
+          // Failed read, reuse buffer for next read, this buffer will be
+          // evicted later based on eviction policy.
+          pushToFreeList(buffer.getBufferindex());
+        }
+        // completed list also contains FAILED read buffers
+        // for sending exception message to clients.
+        buffer.setStatus(result);
+        buffer.setTimeStamp(currentTimeMillis());
+        getCompletedReadList().add(buffer);
+      }
+    }
+
+    //outside the synchronized, since anyone receiving a wake-up from the 
latch must see safe-published results
+    buffer.getLatch().countDown(); // wake up waiting threads (if any)
   }
 
   /**
-   * {@inheritDoc}
+   * Purging the buffers associated with an {@link AbfsInputStream}
+   * from {@link ReadBufferManagerV2} when stream is closed.
+   * @param stream input stream.
    */
-  @Override
-  public void purgeBuffersForStream(final AbfsInputStream stream) {
-    // TODO: To be implemented
+  public synchronized void purgeBuffersForStream(AbfsInputStream stream) {
+    printDebugLog("Purging stale buffers for AbfsInputStream {} ", stream);
+    getReadAheadQueue().removeIf(readBuffer -> readBuffer.getStream() == 
stream);
+    purgeList(stream, getCompletedReadList());
+  }
+
+  private boolean isAlreadyQueued(final String eTag, final long 
requestedOffset) {
+    // returns true if any part of the buffer is already queued
+    return (isInList(getReadAheadQueue(), eTag, requestedOffset)
+        || isInList(getInProgressList(), eTag, requestedOffset)
+        || isInList(getCompletedReadList(), eTag, requestedOffset));
+  }
+
+  private boolean isInList(final Collection<ReadBuffer> list, final String 
eTag,
+      final long requestedOffset) {
+    return (getFromList(list, eTag, requestedOffset) != null);
+  }
+
+  private ReadBuffer getFromList(final Collection<ReadBuffer> list, final 
String eTag,
+      final long requestedOffset) {
+    for (ReadBuffer buffer : list) {
+      if (eTag.equals(buffer.getETag())) {
+        if (buffer.getStatus() == ReadBufferStatus.AVAILABLE
+            && requestedOffset >= buffer.getOffset()
+            && requestedOffset < buffer.getOffset() + buffer.getLength()) {
+          return buffer;
+        } else if (requestedOffset >= buffer.getOffset()
+            && requestedOffset
+            < buffer.getOffset() + buffer.getRequestedLength()) {
+          return buffer;
+        }
+      }
+    }
+    return null;
   }
 
   /**
-   * {@inheritDoc}
+   * If any buffer in the completed list can be reclaimed then reclaim it and 
return the buffer to free list.
+   * The objective is to find just one buffer - there is no advantage to 
evicting more than one.
+   * @return whether the eviction succeeded - i.e., were we able to free up 
one buffer
    */
-  @VisibleForTesting
-  @Override
-  public int getNumBuffers() {
-    return numberOfActiveBuffers;
+  private synchronized boolean tryEvict() {
+    ReadBuffer nodeToEvict = null;
+    if (getCompletedReadList().size() <= 0) {
+      return false;  // there are no evict-able buffers
+    }
+
+    long currentTimeInMs = currentTimeMillis();
+
+    // first, try buffers where all bytes have been consumed (approximated as 
first and last bytes consumed)
+    for (ReadBuffer buf : getCompletedReadList()) {
+      if (buf.isFullyConsumed()) {
+        nodeToEvict = buf;
+        break;
+      }
+    }
+    if (nodeToEvict != null) {
+      return manualEviction(nodeToEvict);
+    }
+
+    // next, try buffers where any bytes have been consumed (maybe a bad idea? 
have to experiment and see)
+    for (ReadBuffer buf : getCompletedReadList()) {
+      if (buf.isAnyByteConsumed()) {
+        nodeToEvict = buf;
+        break;
+      }
+    }
+
+    if (nodeToEvict != null) {
+      return manualEviction(nodeToEvict);
+    }
+
+    // next, try any old nodes that have not been consumed
+    // Failed read buffers (with buffer index=-1) that are older than
+    // thresholdAge should be cleaned up, but at the same time should not
+    // report successful eviction.
+    // Queue logic expects that a buffer is freed up for read ahead when
+    // eviction is successful, whereas a failed ReadBuffer would have released
+    // its buffer when its status was set to READ_FAILED.
+    long earliestBirthday = Long.MAX_VALUE;
+    ArrayList<ReadBuffer> oldFailedBuffers = new ArrayList<>();
+    for (ReadBuffer buf : getCompletedReadList()) {
+      if ((buf.getBufferindex() != -1)
+          && (buf.getTimeStamp() < earliestBirthday)) {
+        nodeToEvict = buf;
+        earliestBirthday = buf.getTimeStamp();
+      } else if ((buf.getBufferindex() == -1)
+          && (currentTimeInMs - buf.getTimeStamp()) > 
getThresholdAgeMilliseconds()) {
+        oldFailedBuffers.add(buf);
+      }
+    }
+
+    for (ReadBuffer buf : oldFailedBuffers) {
+      manualEviction(buf);
+    }
+
+    if ((currentTimeInMs - earliestBirthday > getThresholdAgeMilliseconds()) 
&& (nodeToEvict != null)) {
+      return manualEviction(nodeToEvict);
+    }
+
+    printTraceLog("No buffer eligible for eviction");
+    // nothing can be evicted
+    return false;
+  }
+
+  private boolean evict(final ReadBuffer buf) {
+    if (buf.getRefCount() > 0) {
+      // If the buffer is still being read, then we cannot evict it.
+      printTraceLog(
+          "Cannot evict buffer with index: {}, file: {}, with eTag: {}, 
offset: {} as it is still being read by some input stream",
+          buf.getBufferindex(), buf.getPath(), buf.getETag(), buf.getOffset());
+      return false;
+    }
+    // As failed ReadBuffers (bufferIndx = -1) are saved in 
getCompletedReadList(),
+    // avoid adding it to availableBufferList.
+    if (buf.getBufferindex() != -1) {
+      pushToFreeList(buf.getBufferindex());
+    }
+    getCompletedReadList().remove(buf);
+    buf.setTracingContext(null);
+    printTraceLog(
+        "Eviction of Buffer Completed for BufferIndex: {}, file: {}, with 
eTag: {}, offset: {}, is fully consumed: {}, is partially consumed: {}",
+        buf.getBufferindex(), buf.getPath(), buf.getETag(), buf.getOffset(),
+        buf.isFullyConsumed(), buf.isAnyByteConsumed());
+    return true;
+  }
+
+  private void waitForProcess(final String eTag, final long position, boolean 
isFirstRead) {
+    ReadBuffer readBuf;
+    synchronized (this) {
+      readBuf = clearFromReadAheadQueue(eTag, position, isFirstRead);
+      if (readBuf == null) {
+        readBuf = getFromList(getInProgressList(), eTag, position);
+      }
+    }
+    if (readBuf != null) {         // if in in-progress queue, then block for 
it
+      try {
+        printTraceLog("A relevant read buffer for file: {}, with eTag: {}, 
offset: {}, queued by stream: {}, having buffer idx: {} is being prefetched, 
waiting for latch",
+            readBuf.getPath(), readBuf.getETag(), readBuf.getOffset(), 
readBuf.getStream().hashCode(), readBuf.getBufferindex());
+        readBuf.getLatch().await();  // blocking wait on the caller stream's 
thread
+        // Note on correctness: readBuf gets out of getInProgressList() only 
in 1 place: after worker thread
+        // is done processing it (in doneReading). There, the latch is set 
after removing the buffer from
+        // getInProgressList(). So this latch is safe to be outside the 
synchronized block.
+        // Putting it in synchronized would result in a deadlock, since this 
thread would be holding the lock
+        // while waiting, so no one will be able to  change any state. If this 
becomes more complex in the future,
+        // then the latch cane be removed and replaced with wait/notify 
whenever getInProgressList() is touched.
+      } catch (InterruptedException ex) {
+        Thread.currentThread().interrupt();
+      }
+      printTraceLog("Latch done for file: {}, with eTag: {}, for offset: {}, "
+          + "buffer index: {} queued by stream: {}", readBuf.getPath(), 
readBuf.getETag(),
+          readBuf.getOffset(), readBuf.getBufferindex(), 
readBuf.getStream().hashCode());
+    }
+  }
+
+  private ReadBuffer clearFromReadAheadQueue(final String eTag, final long 
requestedOffset, boolean isFirstRead) {
+    ReadBuffer buffer = getFromList(getReadAheadQueue(), eTag, 
requestedOffset);
+    /*
+     * If this prefetch was triggered by first read of this input stream,
+     * we should not remove it from queue and let it complete by backend 
threads.
+     */
+    if (buffer != null && isFirstRead) {
+      return buffer;
+    }
+    if (buffer != null) {
+      getReadAheadQueue().remove(buffer);
+      notifyAll();   // lock is held in calling method
+      pushToFreeList(buffer.getBufferindex());
+    }
+    return null;
   }
+
+  private int getBlockFromCompletedQueue(final String eTag, final long 
position,
+      final int length, final byte[] buffer) throws IOException {
+    ReadBuffer buf = getBufferFromCompletedQueue(eTag, position);
+
+    if (buf == null) {
+      return 0;
+    }
+
+    buf.startReading(); // atomic increment of refCount.
+
+    if (buf.getStatus() == ReadBufferStatus.READ_FAILED) {
+      // To prevent new read requests to fail due to old read-ahead attempts,
+      // return exception only from buffers that failed within last 
getThresholdAgeMilliseconds()
+      if ((currentTimeMillis() - (buf.getTimeStamp()) < 
getThresholdAgeMilliseconds())) {
+        throw buf.getErrException();
+      } else {
+        return 0;
+      }
+    }
+
+    if ((buf.getStatus() != ReadBufferStatus.AVAILABLE)
+        || (position >= buf.getOffset() + buf.getLength())) {
+      return 0;
+    }
+
+    int cursor = (int) (position - buf.getOffset());
+    int availableLengthInBuffer = buf.getLength() - cursor;
+    int lengthToCopy = Math.min(length, availableLengthInBuffer);
+    System.arraycopy(buf.getBuffer(), cursor, buffer, 0, lengthToCopy);
+    if (cursor == 0) {
+      buf.setFirstByteConsumed(true);
+    }
+    if (cursor + lengthToCopy == buf.getLength()) {
+      buf.setLastByteConsumed(true);
+    }
+    buf.setAnyByteConsumed(true);
+
+    buf.endReading(); // atomic decrement of refCount
+    return lengthToCopy;
+  }
+
+  private ReadBuffer getBufferFromCompletedQueue(final String eTag, final long 
requestedOffset) {
+    for (ReadBuffer buffer : getCompletedReadList()) {
+      // Buffer is returned if the requestedOffset is at or above buffer's
+      // offset but less than buffer's length or the actual requestedLength
+      if (eTag.equals(buffer.getETag())
+          && (requestedOffset >= buffer.getOffset())
+          && ((requestedOffset < buffer.getOffset() + buffer.getLength())
+          || (requestedOffset < buffer.getOffset() + 
buffer.getRequestedLength()))) {
+        return buffer;
+      }
+    }
+    return null;
+  }
+
+  private synchronized boolean tryMemoryUpscale() {
+    if (!isDynamicScalingEnabled) {
+      printTraceLog("Dynamic scaling is disabled, skipping memory upscale");
+      return false; // Dynamic scaling is disabled, so no upscaling.
+    }
+    double memoryLoad = getMemoryLoad();
+    if (memoryLoad < memoryThreshold && getNumBuffers() < maxBufferPoolSize) {
+      // Create and Add more buffers in getFreeList().
+      if (removedBufferList.isEmpty()) {
+        bufferPool[getNumBuffers()] = new byte[getReadAheadBlockSize()];
+        pushToFreeList(getNumBuffers());
+      } else {
+        // Reuse a removed buffer index.
+        int freeIndex = removedBufferList.pop();
+        if (freeIndex >= bufferPool.length) {
+          printTraceLog("Invalid free index: {}. Current buffer pool size: {}",
+              freeIndex, bufferPool.length);
+          return false;
+        }
+        bufferPool[freeIndex] = new byte[getReadAheadBlockSize()];
+        pushToFreeList(freeIndex);
+      }
+      incrementActiveBufferCount();
+      printTraceLog("Current Memory Load: {}. Incrementing buffer pool size to 
{}", memoryLoad, getNumBuffers());
+      return true;
+    }
+    printTraceLog("Could not Upscale memory. Total buffers: {} Memory Load: 
{}",
+        getNumBuffers(), memoryLoad);
+    return false;
+  }
+
+  private void scheduledEviction() {
+    for (ReadBuffer buf : getCompletedReadList()) {
+      if (currentTimeMillis() - buf.getTimeStamp() > 
getThresholdAgeMilliseconds()) {
+        // If the buffer is older than thresholdAge, evict it.
+        printTraceLog("Scheduled Eviction of Buffer Triggered for BufferIndex: 
{}, file: {}, with eTag: {}, offset: {}, length: {}, queued by stream: {}",
+            buf.getBufferindex(), buf.getPath(), buf.getETag(), 
buf.getOffset(), buf.getLength(), buf.getStream().hashCode());
+        evict(buf);
+      }
+    }
+
+    double memoryLoad = getMemoryLoad();
+    if (isDynamicScalingEnabled && memoryLoad > memoryThreshold) {
+      synchronized (this) {
+        if (isFreeListEmpty()) {
+          printTraceLog("No free buffers available. Skipping downscale of 
buffer pool");
+          return; // No free buffers available, so cannot downscale.
+        }
+        int freeIndex = popFromFreeList();
+        bufferPool[freeIndex] = null;
+        removedBufferList.add(freeIndex);
+        decrementActiveBufferCount();
+        printTraceLog("Current Memory Load: {}. Decrementing buffer pool size 
to {}", memoryLoad, getNumBuffers());
+      }
+    }
+  }
+
+  private boolean manualEviction(final ReadBuffer buf) {
+    printTraceLog("Manual Eviction of Buffer Triggered for BufferIndex: {}, 
file: {}, with eTag: {}, offset: {}, queued by stream: {}",
+        buf.getBufferindex(), buf.getPath(), buf.getETag(), buf.getOffset(), 
buf.getStream().hashCode());
+    return evict(buf);
+  }
+
+  private void adjustThreadPool() {
+    int currentPoolSize = workerRefs.size();
+    double cpuLoad = getCpuLoad();
+    int requiredPoolSize = getRequiredThreadPoolSize();
+    int newThreadPoolSize;
+    printTraceLog("Current CPU load: {}, Current worker pool size: {}, Current 
queue size: {}", cpuLoad, currentPoolSize, requiredPoolSize);
+    if (currentPoolSize < requiredPoolSize && cpuLoad < cpuThreshold) {
+      // Submit more background tasks.
+      newThreadPoolSize = Math.min(maxThreadPoolSize,
+          (int) Math.ceil((currentPoolSize * (ONE_HUNDRED + 
threadPoolUpscalePercentage))/ONE_HUNDRED));
+      // Create new Worker Threads
+      for (int i = currentPoolSize; i < newThreadPoolSize; i++) {
+        ReadBufferWorker worker = new ReadBufferWorker(i, getBufferManager());
+        workerRefs.add(worker);
+        workerPool.submit(worker);
+      }
+      printTraceLog("Increased worker pool size from {} to {}", 
currentPoolSize, newThreadPoolSize);
+    } else if (cpuLoad > cpuThreshold || currentPoolSize > requiredPoolSize) {
+      newThreadPoolSize = Math.max(minThreadPoolSize,
+          (int) Math.ceil((currentPoolSize * (ONE_HUNDRED - 
threadPoolDownscalePercentage))/ONE_HUNDRED));
+      // Signal the extra workers to stop
+      while (workerRefs.size() > newThreadPoolSize) {
+        ReadBufferWorker worker = workerRefs.remove(workerRefs.size() - 1);
+        worker.stop();
+      }
+      printTraceLog("Decreased worker pool size from {} to {}", 
currentPoolSize, newThreadPoolSize);
+    } else {
+      printTraceLog("No change in worker pool size. CPU load: {} Pool size: 
{}", cpuLoad, currentPoolSize);
+    }
+  }
+
   /**
-   * {@inheritDoc}
+   * Similar to System.currentTimeMillis, except implemented with 
System.nanoTime().
+   * System.currentTimeMillis can go backwards when system clock is changed 
(e.g., with NTP time synchronization),
+   * making it unsuitable for measuring time intervals. nanotime is strictly 
monotonically increasing per CPU core.
+   * Note: it is not monotonic across Sockets, and even within a CPU, its only 
the
+   * more recent parts which share a clock across all cores.
+   *
+   * @return current time in milliseconds
    */
-  @VisibleForTesting
-  @Override
-  public void callTryEvict() {
-    // TODO: To be implemented
+  private long currentTimeMillis() {
+    return System.nanoTime() / 1000 / 1000;
+  }
+
+  private void purgeList(AbfsInputStream stream, LinkedList<ReadBuffer> list) {
+    for (Iterator<ReadBuffer> it = list.iterator(); it.hasNext();) {
+      ReadBuffer readBuffer = it.next();
+      if (readBuffer.getStream() == stream) {
+        it.remove();
+        // As failed ReadBuffers (bufferIndex = -1) are already pushed to free
+        // list in doneReading method, we will skip adding those here again.
+        if (readBuffer.getBufferindex() != -1) {
+          pushToFreeList(readBuffer.getBufferindex());
+        }
+      }
+    }
   }
 
   /**
-   * {@inheritDoc}
+   * Test method that can clean up the current state of readAhead buffers and
+   * the lists. Will also trigger a fresh init.
    */
   @VisibleForTesting
   @Override
   public void testResetReadBufferManager() {
-    // TODO: To be implemented
+    synchronized (this) {
+      ArrayList<ReadBuffer> completedBuffers = new ArrayList<>();
+      for (ReadBuffer buf : getCompletedReadList()) {
+        if (buf != null) {
+          completedBuffers.add(buf);
+        }
+      }
+
+      for (ReadBuffer buf : completedBuffers) {
+        manualEviction(buf);
+      }
+
+      getReadAheadQueue().clear();
+      getInProgressList().clear();
+      getCompletedReadList().clear();
+      getFreeList().clear();
+      for (int i = 0; i < maxBufferPoolSize; i++) {
+        bufferPool[i] = null;
+      }
+      bufferPool = null;
+      cpuMonitorThread.shutdownNow();
+      memoryMonitorThread.shutdownNow();
+      workerPool.shutdownNow();
+      resetBufferManager();
+    }
   }
 
-  /**
-   * {@inheritDoc}
-   */
   @VisibleForTesting
   @Override
-  public void testResetReadBufferManager(final int readAheadBlockSize,
-      final int thresholdAgeMilliseconds) {
-    // TODO: To be implemented
+  public void testResetReadBufferManager(int readAheadBlockSize, int 
thresholdAgeMilliseconds) {
+    setReadAheadBlockSize(readAheadBlockSize);
+    setThresholdAgeMilliseconds(thresholdAgeMilliseconds);
+    testResetReadBufferManager();
   }
 
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public void testMimicFullUseAndAddFailedBuffer(final ReadBuffer buf) {
-    // TODO: To be implemented
+  @VisibleForTesting
+  public void callTryEvict() {
+    tryEvict();
   }
 
-  private final ThreadFactory namedThreadFactory = new ThreadFactory() {
-    private int count = 0;
-    @Override
-    public Thread newThread(Runnable r) {
-      return new Thread(r, "ReadAheadV2-Thread-" + count++);
+  @VisibleForTesting
+  public int getNumBuffers() {
+    LOCK.lock();
+    try {
+      return numberOfActiveBuffers;
+    } finally {
+      LOCK.unlock();
     }
-  };
+  }
 
   @Override
   void resetBufferManager() {
     setBufferManager(null); // reset the singleton instance
+    setIsConfigured(false);
   }
 
   private static void setBufferManager(ReadBufferManagerV2 manager) {
     bufferManager = manager;
   }
+
+  private static void setIsConfigured(boolean configured) {
+    isConfigured = configured;
+  }
+
+  private final ThreadFactory workerThreadFactory = new ThreadFactory() {
+    private int count = 0;
+    @Override
+    public Thread newThread(Runnable r) {
+      Thread t = new Thread(r, "ReadAheadV2-WorkerThread-" + count++);
+      t.setDaemon(true);
+      return t;
+    }
+  };
+
+  private void printTraceLog(String message, Object... args) {
+    if (LOGGER.isTraceEnabled()) {
+      LOGGER.trace(message, args);
+    }
+  }
+
+  private void printDebugLog(String message, Object... args) {
+    LOGGER.debug(message, args);
+  }
+
+  @VisibleForTesting
+  double getMemoryLoad() {
+    MemoryMXBean osBean = ManagementFactory.getMemoryMXBean();
+    MemoryUsage memoryUsage = osBean.getHeapMemoryUsage();
+    return (double) memoryUsage.getUsed() / memoryUsage.getMax();
+  }
+
+  @VisibleForTesting
+  public double getCpuLoad() {
+    OperatingSystemMXBean osBean = ManagementFactory.getPlatformMXBean(
+        OperatingSystemMXBean.class);
+    return osBean.getSystemCpuLoad();

Review Comment:
   Nice catch
   Taken



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java:
##########
@@ -87,17 +111,53 @@ static ReadBufferManagerV2 getBufferManager() {
   }
 
   /**
-   * {@inheritDoc}
+   * Set the ReadBufferManagerV2 configurations based on the provided before 
singleton initialization.
+   * @param readAheadBlockSize the read-ahead block size to set for the 
ReadBufferManagerV2.
+   * @param abfsConfiguration the configuration to set for the 
ReadBufferManagerV2.
+   */
+  public static void setReadBufferManagerConfigs(final int readAheadBlockSize,
+      final AbfsConfiguration abfsConfiguration) {
+    // Set Configs only before initializations.
+    if (bufferManager == null && !isConfigured) {
+      minThreadPoolSize = abfsConfiguration.getMinReadAheadV2ThreadPoolSize();
+      maxThreadPoolSize = abfsConfiguration.getMaxReadAheadV2ThreadPoolSize();
+      cpuMonitoringIntervalInMilliSec = 
abfsConfiguration.getReadAheadV2CpuMonitoringIntervalMillis();
+      cpuThreshold = 
abfsConfiguration.getReadAheadV2CpuUsageThresholdPercent()/ ONE_HUNDRED;
+      threadPoolUpscalePercentage = 
abfsConfiguration.getReadAheadV2ThreadPoolUpscalePercentage();
+      threadPoolDownscalePercentage = 
abfsConfiguration.getReadAheadV2ThreadPoolDownscalePercentage();
+      executorServiceKeepAliveTimeInMilliSec = 
abfsConfiguration.getReadAheadExecutorServiceTTLInMillis();
+
+      minBufferPoolSize = abfsConfiguration.getMinReadAheadV2BufferPoolSize();

Review Comment:
   Taken



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java:
##########
@@ -128,13 +128,20 @@ public final class FileSystemConfigurations {
   public static final long 
DEFAULT_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS_IN_SECONDS = 120;
 
   public static final boolean DEFAULT_ENABLE_READAHEAD = true;
-  public static final boolean DEFAULT_ENABLE_READAHEAD_V2 = false;
+  public static final boolean DEFAULT_ENABLE_READAHEAD_V2 = true;
+  public static final boolean DEFAULT_ENABLE_READAHEAD_V2_DYNAMIC_SCALING = 
true;

Review Comment:
   Taken





> ABFS: [ReadAheadV2] Implement Read Buffer Manager V2 with improved 
> aggressiveness
> ---------------------------------------------------------------------------------
>
>                 Key: HADOOP-19622
>                 URL: https://issues.apache.org/jira/browse/HADOOP-19622
>             Project: Hadoop Common
>          Issue Type: Sub-task
>          Components: fs/azure
>    Affects Versions: 3.5.0, 3.4.1
>            Reporter: Anuj Modi
>            Assignee: Anuj Modi
>            Priority: Major
>              Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to