[ 
https://issues.apache.org/jira/browse/HADOOP-19622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18033432#comment-18033432
 ] 

ASF GitHub Bot commented on HADOOP-19622:
-----------------------------------------

anujmodi2021 commented on code in PR #7832:
URL: https://github.com/apache/hadoop/pull/7832#discussion_r2467900691


##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestReadBufferManagerV2.java:
##########
@@ -0,0 +1,258 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
+import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
+import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
+import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus;
+
+import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_READAHEAD_V2;
+import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_READAHEAD_V2_DYNAMIC_SCALING;
+import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_READAHEAD_V2_CACHED_BUFFER_TTL_MILLIS;
+import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_READAHEAD_V2_CPU_MONITORING_INTERVAL_MILLIS;
+import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_READAHEAD_V2_CPU_USAGE_THRESHOLD_PERCENT;
+import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_READAHEAD_V2_MAX_THREAD_POOL_SIZE;
+import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_READAHEAD_V2_MEMORY_MONITORING_INTERVAL_MILLIS;
+import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_READAHEAD_V2_MIN_THREAD_POOL_SIZE;
+import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_KB;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Unit Tests around different components of Read Buffer Manager V2
+ */
+public class TestReadBufferManagerV2 extends AbstractAbfsIntegrationTest {
+  private volatile boolean running = true;
+  private final List<byte[]> allocations = new ArrayList<>();
+
+
+  public TestReadBufferManagerV2() throws Exception {
+    super();
+  }
+
+  /**
+   * Test to verify init of ReadBufferManagerV2
+   * @throws Exception if test fails
+   */
+  @Test
+  public void testReadBufferManagerV2Init() throws Exception {
+    
ReadBufferManagerV2.setReadBufferManagerConfigs(getConfiguration().getReadAheadBlockSize(),
 getConfiguration());
+    ReadBufferManagerV2.getBufferManager().testResetReadBufferManager();
+    assertThat(ReadBufferManagerV2.getInstance())
+        .as("ReadBufferManager should be uninitialized").isNull();
+    intercept(IllegalStateException.class, "ReadBufferManagerV2 is not 
configured.", () -> {
+      ReadBufferManagerV2.getBufferManager();
+    });
+    // verify that multiple invocations of getBufferManager returns same 
instance.
+    
ReadBufferManagerV2.setReadBufferManagerConfigs(getConfiguration().getReadAheadBlockSize(),
 getConfiguration());
+    ReadBufferManagerV2 bufferManager = ReadBufferManagerV2.getBufferManager();
+    ReadBufferManagerV2 bufferManager2 = 
ReadBufferManagerV2.getBufferManager();
+    ReadBufferManagerV2 bufferManager3 = ReadBufferManagerV2.getInstance();
+    assertThat(bufferManager).isNotNull();
+    assertThat(bufferManager2).isNotNull();
+    assertThat(bufferManager).isSameAs(bufferManager2);
+    assertThat(bufferManager3).isNotNull();
+    assertThat(bufferManager3).isSameAs(bufferManager);
+
+    // Verify default values are not invalid.
+    assertThat(bufferManager.getMinBufferPoolSize()).isGreaterThan(0);
+    assertThat(bufferManager.getMaxBufferPoolSize()).isGreaterThan(0);
+  }
+
+  /**
+   * Test to verify that cpu monitor thread is not active if disabled.
+   * @throws Exception if test fails
+   */
+  @Test
+  public void testDynamicScalingSwitchingOnAndOff() throws Exception {
+    Configuration conf = new Configuration(getRawConfiguration());
+    conf.setBoolean(FS_AZURE_ENABLE_READAHEAD_V2, true);
+    conf.setBoolean(FS_AZURE_ENABLE_READAHEAD_V2_DYNAMIC_SCALING, true);
+    try(AzureBlobFileSystem fs = (AzureBlobFileSystem) 
FileSystem.newInstance(getFileSystem().getUri(), conf)) {
+      AbfsConfiguration abfsConfiguration = 
fs.getAbfsStore().getAbfsConfiguration();
+      
ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfiguration.getReadAheadBlockSize(),
 abfsConfiguration);
+      ReadBufferManagerV2 bufferManagerV2 = 
ReadBufferManagerV2.getBufferManager();
+      assertThat(bufferManagerV2.getCpuMonitoringThread())
+          .as("CPU Monitor thread should be initialized").isNotNull();
+      bufferManagerV2.resetBufferManager();
+    }
+
+    conf.setBoolean(FS_AZURE_ENABLE_READAHEAD_V2_DYNAMIC_SCALING, false);
+    try(AzureBlobFileSystem fs = (AzureBlobFileSystem) 
FileSystem.newInstance(getFileSystem().getUri(), conf)) {
+      AbfsConfiguration abfsConfiguration = 
fs.getAbfsStore().getAbfsConfiguration();
+      
ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfiguration.getReadAheadBlockSize(),
 abfsConfiguration);
+      ReadBufferManagerV2 bufferManagerV2 = 
ReadBufferManagerV2.getBufferManager();
+      assertThat(bufferManagerV2.getCpuMonitoringThread())
+          .as("CPU Monitor thread should not be initialized").isNull();
+      bufferManagerV2.resetBufferManager();
+    }
+  }
+
+  @Test
+  public void testThreadPoolDynamicScaling() throws Exception {
+    TestAbfsInputStream testAbfsInputStream = new TestAbfsInputStream();

Review Comment:
   Nice catch
   Taken



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java:
##########
@@ -106,123 +166,731 @@ void init() {
         executorServiceKeepAliveTimeInMilliSec,
         TimeUnit.MILLISECONDS,
         new SynchronousQueue<>(),
-        namedThreadFactory);
+        workerThreadFactory);
     workerPool.allowCoreThreadTimeOut(true);
     for (int i = 0; i < minThreadPoolSize; i++) {
-      ReadBufferWorker worker = new ReadBufferWorker(i, this);
+      ReadBufferWorker worker = new ReadBufferWorker(i, getBufferManager());
+      workerRefs.add(worker);
       workerPool.submit(worker);
     }
     ReadBufferWorker.UNLEASH_WORKERS.countDown();
+
+    if (isDynamicScalingEnabled) {
+      cpuMonitorThread = Executors.newSingleThreadScheduledExecutor(runnable 
-> {
+        Thread t = new Thread(runnable, "ReadAheadV2-CPU-Monitor");
+        t.setDaemon(true);
+        return t;
+      });
+      cpuMonitorThread.scheduleAtFixedRate(this::adjustThreadPool,
+          getCpuMonitoringIntervalInMilliSec(), 
getCpuMonitoringIntervalInMilliSec(),
+          TimeUnit.MILLISECONDS);
+    }
+
+    printTraceLog("ReadBufferManagerV2 initialized with {} buffers and {} 
worker threads",
+        numberOfActiveBuffers, workerRefs.size());
   }
 
   /**
-   * {@inheritDoc}
+   * {@link AbfsInputStream} calls this method to queueing read-ahead.
+   * @param stream which read-ahead is requested from.
+   * @param requestedOffset The offset in the file which should be read.
+   * @param requestedLength The length to read.
    */
   @Override
-  public void queueReadAhead(final AbfsInputStream stream,
-      final long requestedOffset,
-      final int requestedLength,
-      final TracingContext tracingContext) {
-    // TODO: To be implemented
+  public void queueReadAhead(final AbfsInputStream stream, final long 
requestedOffset,
+      final int requestedLength, TracingContext tracingContext) {
+    printTraceLog("Start Queueing readAhead for file: {}, with eTag: {}, 
offset: {}, length: {}, triggered by stream: {}",
+        stream.getPath(), stream.getETag(), requestedOffset, requestedLength, 
stream.hashCode());
+    ReadBuffer buffer;
+    synchronized (this) {
+      if (isAlreadyQueued(stream.getETag(), requestedOffset)) {
+        // Already queued for this offset, so skip queuing.
+        printTraceLog("Skipping queuing readAhead for file: {}, with eTag: {}, 
offset: {}, triggered by stream: {} as it is already queued",
+            stream.getPath(), stream.getETag(), requestedOffset, 
stream.hashCode());
+        return;
+      }
+      if (isFreeListEmpty() && !tryMemoryUpscale() && !tryEvict()) {
+        // No buffers are available and more buffers cannot be created. Skip 
queuing.
+        printTraceLog("Skipping queuing readAhead for file: {}, with eTag: {}, 
offset: {}, triggered by stream: {} as no buffers are available",
+            stream.getPath(), stream.getETag(), requestedOffset, 
stream.hashCode());
+        return;
+      }
+
+      // Create a new ReadBuffer to keep the prefetched data and queue.
+      buffer = new ReadBuffer();
+      buffer.setStream(stream); // To map buffer with stream that requested it
+      buffer.setETag(stream.getETag()); // To map buffer with file it belongs 
to
+      buffer.setPath(stream.getPath());
+      buffer.setOffset(requestedOffset);
+      buffer.setLength(0);
+      buffer.setRequestedLength(requestedLength);
+      buffer.setStatus(ReadBufferStatus.NOT_AVAILABLE);
+      buffer.setLatch(new CountDownLatch(1));
+      buffer.setTracingContext(tracingContext);
+
+      if (isFreeListEmpty()) {
+        /*
+         * By now there should be at least one buffer available.
+         * This is to double sure that after upscaling or eviction,
+         * we still have free buffer available. If not, we skip queueing.
+         */
+        return;
+      }
+      Integer bufferIndex = popFromFreeList();
+      buffer.setBuffer(bufferPool[bufferIndex]);
+      buffer.setBufferindex(bufferIndex);
+      getReadAheadQueue().add(buffer);
+      notifyAll();
+      printTraceLog("Done q-ing readAhead for file: {}, with eTag:{}, offset: 
{}, buffer idx: {}, triggered by stream: {}",
+          stream.getPath(), stream.getETag(), requestedOffset, 
buffer.getBufferindex(), stream.hashCode());
+    }
   }
 
   /**
-   * {@inheritDoc}
+   * {@link AbfsInputStream} calls this method read any bytes already 
available in a buffer (thereby saving a
+   * remote read). This returns the bytes if the data already exists in 
buffer. If there is a buffer that is reading
+   * the requested offset, then this method blocks until that read completes. 
If the data is queued in a read-ahead
+   * but not picked up by a worker thread yet, then it cancels that read-ahead 
and reports cache miss. This is because
+   * depending on worker thread availability, the read-ahead may take a while 
- the calling thread can do its own
+   * read to get the data faster (compared to the read waiting in queue for an 
indeterminate amount of time).
+   *
+   * @param stream of the file to read bytes for
+   * @param position the offset in the file to do a read for
+   * @param length   the length to read
+   * @param buffer   the buffer to read data into. Note that the buffer will 
be written into from offset 0.
+   * @return the number of bytes read
    */
   @Override
-  public int getBlock(final AbfsInputStream stream,
-      final long position,
-      final int length,
-      final byte[] buffer) throws IOException {
-    // TODO: To be implemented
+  public int getBlock(final AbfsInputStream stream, final long position, final 
int length, final byte[] buffer)
+      throws IOException {
+    // not synchronized, so have to be careful with locking
+    printTraceLog("getBlock request for file: {}, with eTag: {}, for position: 
{} for length: {} received from stream: {}",
+        stream.getPath(), stream.getETag(), position, length, 
stream.hashCode());
+
+    String requestedETag = stream.getETag();
+    boolean isFirstRead = stream.isFirstRead();
+
+    // Wait for any in-progress read to complete.
+    waitForProcess(requestedETag, position, isFirstRead);
+
+    int bytesRead = 0;
+    synchronized (this) {
+      bytesRead = getBlockFromCompletedQueue(requestedETag, position, length, 
buffer);
+    }
+    if (bytesRead > 0) {
+      printTraceLog("Done read from Cache for the file with eTag: {}, 
position: {}, length: {}, requested by stream: {}",
+          requestedETag, position, bytesRead, stream.hashCode());
+      return bytesRead;
+    }
+
+    // otherwise, just say we got nothing - calling thread can do its own read
     return 0;
   }
 
   /**
-   * {@inheritDoc}
+   * {@link ReadBufferWorker} thread calls this to get the next buffer that it 
should work on.
+   * @return {@link ReadBuffer}
+   * @throws InterruptedException if thread is interrupted
    */
   @Override
   public ReadBuffer getNextBlockToRead() throws InterruptedException {
-    // TODO: To be implemented
-    return null;
+    ReadBuffer buffer = null;
+    synchronized (this) {
+      // Blocking Call to wait for prefetch to be queued.
+      while (getReadAheadQueue().size() == 0) {
+        wait();
+      }
+
+      buffer = getReadAheadQueue().remove();
+      notifyAll();
+      if (buffer == null) {
+        return null;
+      }
+      buffer.setStatus(ReadBufferStatus.READING_IN_PROGRESS);
+      getInProgressList().add(buffer);
+    }
+    printTraceLog("ReadBufferWorker picked file: {}, with eTag: {}, for 
offset: {}, queued by stream: {}",
+        buffer.getPath(), buffer.getETag(), buffer.getOffset(), 
buffer.getStream().hashCode());
+    return buffer;
   }
 
   /**
-   * {@inheritDoc}
+   * {@link ReadBufferWorker} thread calls this method to post completion.   *
+   * @param buffer            the buffer whose read was completed
+   * @param result            the {@link ReadBufferStatus} after the read 
operation in the worker thread
+   * @param bytesActuallyRead the number of bytes that the worker thread was 
actually able to read
    */
   @Override
-  public void doneReading(final ReadBuffer buffer,
-      final ReadBufferStatus result,
+  public void doneReading(final ReadBuffer buffer, final ReadBufferStatus 
result,
       final int bytesActuallyRead) {
-    // TODO: To be implemented
+    printTraceLog("ReadBufferWorker completed prefetch for file: {} with eTag: 
{}, for offset: {}, queued by stream: {}, with status: {} and bytes read: {}",
+        buffer.getPath(), buffer.getETag(), buffer.getOffset(), 
buffer.getStream().hashCode(), result, bytesActuallyRead);
+    synchronized (this) {
+      // If this buffer has already been purged during
+      // close of InputStream then we don't update the lists.
+      if (getInProgressList().contains(buffer)) {
+        getInProgressList().remove(buffer);
+        if (result == ReadBufferStatus.AVAILABLE && bytesActuallyRead > 0) {
+          // Successful read, so update the buffer status and length
+          buffer.setStatus(ReadBufferStatus.AVAILABLE);
+          buffer.setLength(bytesActuallyRead);
+        } else {
+          // Failed read, reuse buffer for next read, this buffer will be
+          // evicted later based on eviction policy.
+          pushToFreeList(buffer.getBufferindex());
+        }
+        // completed list also contains FAILED read buffers
+        // for sending exception message to clients.
+        buffer.setStatus(result);
+        buffer.setTimeStamp(currentTimeMillis());
+        getCompletedReadList().add(buffer);
+      }
+    }
+
+    //outside the synchronized, since anyone receiving a wake-up from the 
latch must see safe-published results
+    buffer.getLatch().countDown(); // wake up waiting threads (if any)
   }
 
   /**
-   * {@inheritDoc}
+   * Purging the buffers associated with an {@link AbfsInputStream}
+   * from {@link ReadBufferManagerV2} when stream is closed.
+   * @param stream input stream.
    */
-  @Override
-  public void purgeBuffersForStream(final AbfsInputStream stream) {
-    // TODO: To be implemented
+  public synchronized void purgeBuffersForStream(AbfsInputStream stream) {
+    printDebugLog("Purging stale buffers for AbfsInputStream {} ", stream);
+    getReadAheadQueue().removeIf(readBuffer -> readBuffer.getStream() == 
stream);
+    purgeList(stream, getCompletedReadList());
+  }
+
+  private boolean isAlreadyQueued(final String eTag, final long 
requestedOffset) {
+    // returns true if any part of the buffer is already queued
+    return (isInList(getReadAheadQueue(), eTag, requestedOffset)
+        || isInList(getInProgressList(), eTag, requestedOffset)
+        || isInList(getCompletedReadList(), eTag, requestedOffset));
+  }
+
+  private boolean isInList(final Collection<ReadBuffer> list, final String 
eTag,

Review Comment:
   Added





> ABFS: [ReadAheadV2] Implement Read Buffer Manager V2 with improved 
> aggressiveness
> ---------------------------------------------------------------------------------
>
>                 Key: HADOOP-19622
>                 URL: https://issues.apache.org/jira/browse/HADOOP-19622
>             Project: Hadoop Common
>          Issue Type: Sub-task
>          Components: fs/azure
>    Affects Versions: 3.5.0, 3.4.1
>            Reporter: Anuj Modi
>            Assignee: Anuj Modi
>            Priority: Major
>              Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to