anujmodi2021 commented on code in PR #7832:
URL: https://github.com/apache/hadoop/pull/7832#discussion_r2467900691


##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestReadBufferManagerV2.java:
##########
@@ -0,0 +1,258 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
+import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
+import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
+import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus;
+
+import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_READAHEAD_V2;
+import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_READAHEAD_V2_DYNAMIC_SCALING;
+import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_READAHEAD_V2_CACHED_BUFFER_TTL_MILLIS;
+import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_READAHEAD_V2_CPU_MONITORING_INTERVAL_MILLIS;
+import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_READAHEAD_V2_CPU_USAGE_THRESHOLD_PERCENT;
+import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_READAHEAD_V2_MAX_THREAD_POOL_SIZE;
+import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_READAHEAD_V2_MEMORY_MONITORING_INTERVAL_MILLIS;
+import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_READAHEAD_V2_MIN_THREAD_POOL_SIZE;
+import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_KB;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Unit Tests around different components of Read Buffer Manager V2
+ */
+public class TestReadBufferManagerV2 extends AbstractAbfsIntegrationTest {
+  private volatile boolean running = true;
+  private final List<byte[]> allocations = new ArrayList<>();
+
+
+  public TestReadBufferManagerV2() throws Exception {
+    super();
+  }
+
+  /**
+   * Test to verify init of ReadBufferManagerV2
+   * @throws Exception if test fails
+   */
+  @Test
+  public void testReadBufferManagerV2Init() throws Exception {
+    
ReadBufferManagerV2.setReadBufferManagerConfigs(getConfiguration().getReadAheadBlockSize(),
 getConfiguration());
+    ReadBufferManagerV2.getBufferManager().testResetReadBufferManager();
+    assertThat(ReadBufferManagerV2.getInstance())
+        .as("ReadBufferManager should be uninitialized").isNull();
+    intercept(IllegalStateException.class, "ReadBufferManagerV2 is not 
configured.", () -> {
+      ReadBufferManagerV2.getBufferManager();
+    });
+    // verify that multiple invocations of getBufferManager returns same 
instance.
+    
ReadBufferManagerV2.setReadBufferManagerConfigs(getConfiguration().getReadAheadBlockSize(),
 getConfiguration());
+    ReadBufferManagerV2 bufferManager = ReadBufferManagerV2.getBufferManager();
+    ReadBufferManagerV2 bufferManager2 = 
ReadBufferManagerV2.getBufferManager();
+    ReadBufferManagerV2 bufferManager3 = ReadBufferManagerV2.getInstance();
+    assertThat(bufferManager).isNotNull();
+    assertThat(bufferManager2).isNotNull();
+    assertThat(bufferManager).isSameAs(bufferManager2);
+    assertThat(bufferManager3).isNotNull();
+    assertThat(bufferManager3).isSameAs(bufferManager);
+
+    // Verify default values are not invalid.
+    assertThat(bufferManager.getMinBufferPoolSize()).isGreaterThan(0);
+    assertThat(bufferManager.getMaxBufferPoolSize()).isGreaterThan(0);
+  }
+
+  /**
+   * Test to verify that cpu monitor thread is not active if disabled.
+   * @throws Exception if test fails
+   */
+  @Test
+  public void testDynamicScalingSwitchingOnAndOff() throws Exception {
+    Configuration conf = new Configuration(getRawConfiguration());
+    conf.setBoolean(FS_AZURE_ENABLE_READAHEAD_V2, true);
+    conf.setBoolean(FS_AZURE_ENABLE_READAHEAD_V2_DYNAMIC_SCALING, true);
+    try(AzureBlobFileSystem fs = (AzureBlobFileSystem) 
FileSystem.newInstance(getFileSystem().getUri(), conf)) {
+      AbfsConfiguration abfsConfiguration = 
fs.getAbfsStore().getAbfsConfiguration();
+      
ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfiguration.getReadAheadBlockSize(),
 abfsConfiguration);
+      ReadBufferManagerV2 bufferManagerV2 = 
ReadBufferManagerV2.getBufferManager();
+      assertThat(bufferManagerV2.getCpuMonitoringThread())
+          .as("CPU Monitor thread should be initialized").isNotNull();
+      bufferManagerV2.resetBufferManager();
+    }
+
+    conf.setBoolean(FS_AZURE_ENABLE_READAHEAD_V2_DYNAMIC_SCALING, false);
+    try(AzureBlobFileSystem fs = (AzureBlobFileSystem) 
FileSystem.newInstance(getFileSystem().getUri(), conf)) {
+      AbfsConfiguration abfsConfiguration = 
fs.getAbfsStore().getAbfsConfiguration();
+      
ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfiguration.getReadAheadBlockSize(),
 abfsConfiguration);
+      ReadBufferManagerV2 bufferManagerV2 = 
ReadBufferManagerV2.getBufferManager();
+      assertThat(bufferManagerV2.getCpuMonitoringThread())
+          .as("CPU Monitor thread should not be initialized").isNull();
+      bufferManagerV2.resetBufferManager();
+    }
+  }
+
+  @Test
+  public void testThreadPoolDynamicScaling() throws Exception {
+    TestAbfsInputStream testAbfsInputStream = new TestAbfsInputStream();

Review Comment:
   Nice catch
   Taken



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java:
##########
@@ -106,123 +166,731 @@ void init() {
         executorServiceKeepAliveTimeInMilliSec,
         TimeUnit.MILLISECONDS,
         new SynchronousQueue<>(),
-        namedThreadFactory);
+        workerThreadFactory);
     workerPool.allowCoreThreadTimeOut(true);
     for (int i = 0; i < minThreadPoolSize; i++) {
-      ReadBufferWorker worker = new ReadBufferWorker(i, this);
+      ReadBufferWorker worker = new ReadBufferWorker(i, getBufferManager());
+      workerRefs.add(worker);
       workerPool.submit(worker);
     }
     ReadBufferWorker.UNLEASH_WORKERS.countDown();
+
+    if (isDynamicScalingEnabled) {
+      cpuMonitorThread = Executors.newSingleThreadScheduledExecutor(runnable 
-> {
+        Thread t = new Thread(runnable, "ReadAheadV2-CPU-Monitor");
+        t.setDaemon(true);
+        return t;
+      });
+      cpuMonitorThread.scheduleAtFixedRate(this::adjustThreadPool,
+          getCpuMonitoringIntervalInMilliSec(), 
getCpuMonitoringIntervalInMilliSec(),
+          TimeUnit.MILLISECONDS);
+    }
+
+    printTraceLog("ReadBufferManagerV2 initialized with {} buffers and {} 
worker threads",
+        numberOfActiveBuffers, workerRefs.size());
   }
 
   /**
-   * {@inheritDoc}
+   * {@link AbfsInputStream} calls this method to queueing read-ahead.
+   * @param stream which read-ahead is requested from.
+   * @param requestedOffset The offset in the file which should be read.
+   * @param requestedLength The length to read.
    */
   @Override
-  public void queueReadAhead(final AbfsInputStream stream,
-      final long requestedOffset,
-      final int requestedLength,
-      final TracingContext tracingContext) {
-    // TODO: To be implemented
+  public void queueReadAhead(final AbfsInputStream stream, final long 
requestedOffset,
+      final int requestedLength, TracingContext tracingContext) {
+    printTraceLog("Start Queueing readAhead for file: {}, with eTag: {}, 
offset: {}, length: {}, triggered by stream: {}",
+        stream.getPath(), stream.getETag(), requestedOffset, requestedLength, 
stream.hashCode());
+    ReadBuffer buffer;
+    synchronized (this) {
+      if (isAlreadyQueued(stream.getETag(), requestedOffset)) {
+        // Already queued for this offset, so skip queuing.
+        printTraceLog("Skipping queuing readAhead for file: {}, with eTag: {}, 
offset: {}, triggered by stream: {} as it is already queued",
+            stream.getPath(), stream.getETag(), requestedOffset, 
stream.hashCode());
+        return;
+      }
+      if (isFreeListEmpty() && !tryMemoryUpscale() && !tryEvict()) {
+        // No buffers are available and more buffers cannot be created. Skip 
queuing.
+        printTraceLog("Skipping queuing readAhead for file: {}, with eTag: {}, 
offset: {}, triggered by stream: {} as no buffers are available",
+            stream.getPath(), stream.getETag(), requestedOffset, 
stream.hashCode());
+        return;
+      }
+
+      // Create a new ReadBuffer to keep the prefetched data and queue.
+      buffer = new ReadBuffer();
+      buffer.setStream(stream); // To map buffer with stream that requested it
+      buffer.setETag(stream.getETag()); // To map buffer with file it belongs 
to
+      buffer.setPath(stream.getPath());
+      buffer.setOffset(requestedOffset);
+      buffer.setLength(0);
+      buffer.setRequestedLength(requestedLength);
+      buffer.setStatus(ReadBufferStatus.NOT_AVAILABLE);
+      buffer.setLatch(new CountDownLatch(1));
+      buffer.setTracingContext(tracingContext);
+
+      if (isFreeListEmpty()) {
+        /*
+         * By now there should be at least one buffer available.
+         * This is to double sure that after upscaling or eviction,
+         * we still have free buffer available. If not, we skip queueing.
+         */
+        return;
+      }
+      Integer bufferIndex = popFromFreeList();
+      buffer.setBuffer(bufferPool[bufferIndex]);
+      buffer.setBufferindex(bufferIndex);
+      getReadAheadQueue().add(buffer);
+      notifyAll();
+      printTraceLog("Done q-ing readAhead for file: {}, with eTag:{}, offset: 
{}, buffer idx: {}, triggered by stream: {}",
+          stream.getPath(), stream.getETag(), requestedOffset, 
buffer.getBufferindex(), stream.hashCode());
+    }
   }
 
   /**
-   * {@inheritDoc}
+   * {@link AbfsInputStream} calls this method read any bytes already 
available in a buffer (thereby saving a
+   * remote read). This returns the bytes if the data already exists in 
buffer. If there is a buffer that is reading
+   * the requested offset, then this method blocks until that read completes. 
If the data is queued in a read-ahead
+   * but not picked up by a worker thread yet, then it cancels that read-ahead 
and reports cache miss. This is because
+   * depending on worker thread availability, the read-ahead may take a while 
- the calling thread can do its own
+   * read to get the data faster (compared to the read waiting in queue for an 
indeterminate amount of time).
+   *
+   * @param stream of the file to read bytes for
+   * @param position the offset in the file to do a read for
+   * @param length   the length to read
+   * @param buffer   the buffer to read data into. Note that the buffer will 
be written into from offset 0.
+   * @return the number of bytes read
    */
   @Override
-  public int getBlock(final AbfsInputStream stream,
-      final long position,
-      final int length,
-      final byte[] buffer) throws IOException {
-    // TODO: To be implemented
+  public int getBlock(final AbfsInputStream stream, final long position, final 
int length, final byte[] buffer)
+      throws IOException {
+    // not synchronized, so have to be careful with locking
+    printTraceLog("getBlock request for file: {}, with eTag: {}, for position: 
{} for length: {} received from stream: {}",
+        stream.getPath(), stream.getETag(), position, length, 
stream.hashCode());
+
+    String requestedETag = stream.getETag();
+    boolean isFirstRead = stream.isFirstRead();
+
+    // Wait for any in-progress read to complete.
+    waitForProcess(requestedETag, position, isFirstRead);
+
+    int bytesRead = 0;
+    synchronized (this) {
+      bytesRead = getBlockFromCompletedQueue(requestedETag, position, length, 
buffer);
+    }
+    if (bytesRead > 0) {
+      printTraceLog("Done read from Cache for the file with eTag: {}, 
position: {}, length: {}, requested by stream: {}",
+          requestedETag, position, bytesRead, stream.hashCode());
+      return bytesRead;
+    }
+
+    // otherwise, just say we got nothing - calling thread can do its own read
     return 0;
   }
 
   /**
-   * {@inheritDoc}
+   * {@link ReadBufferWorker} thread calls this to get the next buffer that it 
should work on.
+   * @return {@link ReadBuffer}
+   * @throws InterruptedException if thread is interrupted
    */
   @Override
   public ReadBuffer getNextBlockToRead() throws InterruptedException {
-    // TODO: To be implemented
-    return null;
+    ReadBuffer buffer = null;
+    synchronized (this) {
+      // Blocking Call to wait for prefetch to be queued.
+      while (getReadAheadQueue().size() == 0) {
+        wait();
+      }
+
+      buffer = getReadAheadQueue().remove();
+      notifyAll();
+      if (buffer == null) {
+        return null;
+      }
+      buffer.setStatus(ReadBufferStatus.READING_IN_PROGRESS);
+      getInProgressList().add(buffer);
+    }
+    printTraceLog("ReadBufferWorker picked file: {}, with eTag: {}, for 
offset: {}, queued by stream: {}",
+        buffer.getPath(), buffer.getETag(), buffer.getOffset(), 
buffer.getStream().hashCode());
+    return buffer;
   }
 
   /**
-   * {@inheritDoc}
+   * {@link ReadBufferWorker} thread calls this method to post completion.   *
+   * @param buffer            the buffer whose read was completed
+   * @param result            the {@link ReadBufferStatus} after the read 
operation in the worker thread
+   * @param bytesActuallyRead the number of bytes that the worker thread was 
actually able to read
    */
   @Override
-  public void doneReading(final ReadBuffer buffer,
-      final ReadBufferStatus result,
+  public void doneReading(final ReadBuffer buffer, final ReadBufferStatus 
result,
       final int bytesActuallyRead) {
-    // TODO: To be implemented
+    printTraceLog("ReadBufferWorker completed prefetch for file: {} with eTag: 
{}, for offset: {}, queued by stream: {}, with status: {} and bytes read: {}",
+        buffer.getPath(), buffer.getETag(), buffer.getOffset(), 
buffer.getStream().hashCode(), result, bytesActuallyRead);
+    synchronized (this) {
+      // If this buffer has already been purged during
+      // close of InputStream then we don't update the lists.
+      if (getInProgressList().contains(buffer)) {
+        getInProgressList().remove(buffer);
+        if (result == ReadBufferStatus.AVAILABLE && bytesActuallyRead > 0) {
+          // Successful read, so update the buffer status and length
+          buffer.setStatus(ReadBufferStatus.AVAILABLE);
+          buffer.setLength(bytesActuallyRead);
+        } else {
+          // Failed read, reuse buffer for next read, this buffer will be
+          // evicted later based on eviction policy.
+          pushToFreeList(buffer.getBufferindex());
+        }
+        // completed list also contains FAILED read buffers
+        // for sending exception message to clients.
+        buffer.setStatus(result);
+        buffer.setTimeStamp(currentTimeMillis());
+        getCompletedReadList().add(buffer);
+      }
+    }
+
+    //outside the synchronized, since anyone receiving a wake-up from the 
latch must see safe-published results
+    buffer.getLatch().countDown(); // wake up waiting threads (if any)
   }
 
   /**
-   * {@inheritDoc}
+   * Purging the buffers associated with an {@link AbfsInputStream}
+   * from {@link ReadBufferManagerV2} when stream is closed.
+   * @param stream input stream.
    */
-  @Override
-  public void purgeBuffersForStream(final AbfsInputStream stream) {
-    // TODO: To be implemented
+  public synchronized void purgeBuffersForStream(AbfsInputStream stream) {
+    printDebugLog("Purging stale buffers for AbfsInputStream {} ", stream);
+    getReadAheadQueue().removeIf(readBuffer -> readBuffer.getStream() == 
stream);
+    purgeList(stream, getCompletedReadList());
+  }
+
+  private boolean isAlreadyQueued(final String eTag, final long 
requestedOffset) {
+    // returns true if any part of the buffer is already queued
+    return (isInList(getReadAheadQueue(), eTag, requestedOffset)
+        || isInList(getInProgressList(), eTag, requestedOffset)
+        || isInList(getCompletedReadList(), eTag, requestedOffset));
+  }
+
+  private boolean isInList(final Collection<ReadBuffer> list, final String 
eTag,

Review Comment:
   Added



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to