[ 
https://issues.apache.org/jira/browse/HADOOP-19622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18033134#comment-18033134
 ] 

ASF GitHub Bot commented on HADOOP-19622:
-----------------------------------------

anmolanmol1234 commented on code in PR #7832:
URL: https://github.com/apache/hadoop/pull/7832#discussion_r2464780671


##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestReadBufferManagerV2.java:
##########
@@ -0,0 +1,258 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
+import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
+import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
+import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus;
+
+import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_READAHEAD_V2;
+import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_READAHEAD_V2_DYNAMIC_SCALING;
+import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_READAHEAD_V2_CACHED_BUFFER_TTL_MILLIS;
+import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_READAHEAD_V2_CPU_MONITORING_INTERVAL_MILLIS;
+import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_READAHEAD_V2_CPU_USAGE_THRESHOLD_PERCENT;
+import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_READAHEAD_V2_MAX_THREAD_POOL_SIZE;
+import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_READAHEAD_V2_MEMORY_MONITORING_INTERVAL_MILLIS;
+import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_READAHEAD_V2_MIN_THREAD_POOL_SIZE;
+import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_KB;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Unit Tests around different components of Read Buffer Manager V2
+ */
+public class TestReadBufferManagerV2 extends AbstractAbfsIntegrationTest {
+  private volatile boolean running = true;
+  private final List<byte[]> allocations = new ArrayList<>();
+
+
+  public TestReadBufferManagerV2() throws Exception {
+    super();
+  }
+
+  /**
+   * Test to verify init of ReadBufferManagerV2
+   * @throws Exception if test fails
+   */
+  @Test
+  public void testReadBufferManagerV2Init() throws Exception {
+    
ReadBufferManagerV2.setReadBufferManagerConfigs(getConfiguration().getReadAheadBlockSize(),
 getConfiguration());
+    ReadBufferManagerV2.getBufferManager().testResetReadBufferManager();
+    assertThat(ReadBufferManagerV2.getInstance())
+        .as("ReadBufferManager should be uninitialized").isNull();
+    intercept(IllegalStateException.class, "ReadBufferManagerV2 is not 
configured.", () -> {
+      ReadBufferManagerV2.getBufferManager();
+    });
+    // verify that multiple invocations of getBufferManager returns same 
instance.
+    
ReadBufferManagerV2.setReadBufferManagerConfigs(getConfiguration().getReadAheadBlockSize(),
 getConfiguration());
+    ReadBufferManagerV2 bufferManager = ReadBufferManagerV2.getBufferManager();
+    ReadBufferManagerV2 bufferManager2 = 
ReadBufferManagerV2.getBufferManager();
+    ReadBufferManagerV2 bufferManager3 = ReadBufferManagerV2.getInstance();
+    assertThat(bufferManager).isNotNull();
+    assertThat(bufferManager2).isNotNull();
+    assertThat(bufferManager).isSameAs(bufferManager2);
+    assertThat(bufferManager3).isNotNull();
+    assertThat(bufferManager3).isSameAs(bufferManager);
+
+    // Verify default values are not invalid.
+    assertThat(bufferManager.getMinBufferPoolSize()).isGreaterThan(0);
+    assertThat(bufferManager.getMaxBufferPoolSize()).isGreaterThan(0);
+  }
+
+  /**
+   * Test to verify that cpu monitor thread is not active if disabled.
+   * @throws Exception if test fails
+   */
+  @Test
+  public void testDynamicScalingSwitchingOnAndOff() throws Exception {
+    Configuration conf = new Configuration(getRawConfiguration());
+    conf.setBoolean(FS_AZURE_ENABLE_READAHEAD_V2, true);
+    conf.setBoolean(FS_AZURE_ENABLE_READAHEAD_V2_DYNAMIC_SCALING, true);
+    try(AzureBlobFileSystem fs = (AzureBlobFileSystem) 
FileSystem.newInstance(getFileSystem().getUri(), conf)) {
+      AbfsConfiguration abfsConfiguration = 
fs.getAbfsStore().getAbfsConfiguration();
+      
ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfiguration.getReadAheadBlockSize(),
 abfsConfiguration);
+      ReadBufferManagerV2 bufferManagerV2 = 
ReadBufferManagerV2.getBufferManager();
+      assertThat(bufferManagerV2.getCpuMonitoringThread())
+          .as("CPU Monitor thread should be initialized").isNotNull();
+      bufferManagerV2.resetBufferManager();
+    }
+
+    conf.setBoolean(FS_AZURE_ENABLE_READAHEAD_V2_DYNAMIC_SCALING, false);
+    try(AzureBlobFileSystem fs = (AzureBlobFileSystem) 
FileSystem.newInstance(getFileSystem().getUri(), conf)) {
+      AbfsConfiguration abfsConfiguration = 
fs.getAbfsStore().getAbfsConfiguration();
+      
ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfiguration.getReadAheadBlockSize(),
 abfsConfiguration);
+      ReadBufferManagerV2 bufferManagerV2 = 
ReadBufferManagerV2.getBufferManager();
+      assertThat(bufferManagerV2.getCpuMonitoringThread())
+          .as("CPU Monitor thread should not be initialized").isNull();
+      bufferManagerV2.resetBufferManager();
+    }
+  }
+
+  @Test
+  public void testThreadPoolDynamicScaling() throws Exception {
+    TestAbfsInputStream testAbfsInputStream = new TestAbfsInputStream();
+    AbfsClient client = testAbfsInputStream.getMockAbfsClient();
+    AbfsInputStream inputStream = 
testAbfsInputStream.getAbfsInputStream(client, "testFailedReadAhead.txt");
+    Configuration configuration = getReabAheadV2Configuration();
+    AbfsConfiguration abfsConfig = new AbfsConfiguration(configuration,
+        getAccountName());
+    ReadBufferManagerV2.getBufferManager().testResetReadBufferManager();
+    
ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfig.getReadAheadBlockSize(),
 abfsConfig);
+    ReadBufferManagerV2 bufferManagerV2 = 
ReadBufferManagerV2.getBufferManager();
+    assertThat(bufferManagerV2.getCurrentThreadPoolSize()).isEqualTo(2);
+    int[] reqOffset = {0};
+    int reqLength = 1;
+    Thread t = new Thread(() -> {
+      while (running) {
+        bufferManagerV2.queueReadAhead(inputStream, reqOffset[0], reqLength,
+            inputStream.getTracingContext());
+        reqOffset[0] += reqLength;
+      }
+    });
+    t.start();
+    Thread.sleep(2L * bufferManagerV2.getCpuMonitoringIntervalInMilliSec());
+    assertThat(bufferManagerV2.getCurrentThreadPoolSize()).isEqualTo(4);
+    running = false;
+    t.join();
+    Thread.sleep(4L * bufferManagerV2.getCpuMonitoringIntervalInMilliSec());
+    assertThat(bufferManagerV2.getCurrentThreadPoolSize()).isLessThan(4);
+  }
+
+  @Test
+  public void testScheduledEviction() throws Exception {
+    TestAbfsInputStream testAbfsInputStream = new TestAbfsInputStream();
+    AbfsClient client = testAbfsInputStream.getMockAbfsClient();
+    AbfsInputStream inputStream = 
testAbfsInputStream.getAbfsInputStream(client, "testFailedReadAhead.txt");
+    Configuration configuration = getReabAheadV2Configuration();
+    AbfsConfiguration abfsConfig = new AbfsConfiguration(configuration,
+        getAccountName());
+    ReadBufferManagerV2.getBufferManager().testResetReadBufferManager();
+    
ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfig.getReadAheadBlockSize(),
 abfsConfig);
+    ReadBufferManagerV2 bufferManagerV2 = 
ReadBufferManagerV2.getBufferManager();
+    // Add a failed buffer to completed queue and set to no free buffers to 
read ahead.
+    ReadBuffer buff = new ReadBuffer();
+    buff.setStatus(ReadBufferStatus.READ_FAILED);
+    buff.setStream(inputStream);
+    bufferManagerV2.testMimicFullUseAndAddFailedBuffer(buff);
+    bufferManagerV2.testMimicFullUseAndAddFailedBuffer(buff);
+    assertThat(bufferManagerV2.getCompletedReadListSize()).isEqualTo(2);
+    Thread.sleep(2L * bufferManagerV2.getMemoryMonitoringIntervalInMilliSec());
+    assertThat(bufferManagerV2.getCompletedReadListSize()).isEqualTo(0);
+  }
+
+  @Test
+  public void testMemoryUpscaleNotAllowedIfMemoryAboveThreshold() throws 
Exception {
+    TestAbfsInputStream testAbfsInputStream = new TestAbfsInputStream();
+    AbfsClient client = testAbfsInputStream.getMockAbfsClient();
+    AbfsInputStream inputStream = 
testAbfsInputStream.getAbfsInputStream(client, "testFailedReadAhead.txt");
+    Configuration configuration = getReabAheadV2Configuration();
+    AbfsConfiguration abfsConfig = new AbfsConfiguration(configuration,
+        getAccountName());
+    ReadBufferManagerV2.getBufferManager().testResetReadBufferManager();
+    
ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfig.getReadAheadBlockSize(),
 abfsConfig);
+    ReadBufferManagerV2 bufferManagerV2 = 
Mockito.spy(ReadBufferManagerV2.getBufferManager());
+    Mockito.doReturn(0.6).when(bufferManagerV2).getMemoryLoad();
+    // Add a failed buffer to completed queue and set to no free buffers to 
read ahead.
+    ReadBuffer buff = new ReadBuffer();
+    buff.setStatus(ReadBufferStatus.READ_FAILED);
+    buff.setStream(inputStream);
+    bufferManagerV2.testMimicFullUseAndAddFailedBuffer(buff);
+    
assertThat(bufferManagerV2.getNumBuffers()).isEqualTo(bufferManagerV2.getMinBufferPoolSize());
+    bufferManagerV2.queueReadAhead(inputStream, 0, ONE_KB,
+        inputStream.getTracingContext());
+    
assertThat(bufferManagerV2.getNumBuffers()).isEqualTo(bufferManagerV2.getMinBufferPoolSize());
+  }
+
+  @Test
+  public void testMemoryUpscaleIfMemoryBelowThreshold() throws Exception {
+    TestAbfsInputStream testAbfsInputStream = new TestAbfsInputStream();
+    AbfsClient client = testAbfsInputStream.getMockAbfsClient();
+    AbfsInputStream inputStream = 
testAbfsInputStream.getAbfsInputStream(client, "testFailedReadAhead.txt");
+    Configuration configuration = getReabAheadV2Configuration();
+    AbfsConfiguration abfsConfig = new AbfsConfiguration(configuration,
+        getAccountName());
+    ReadBufferManagerV2.getBufferManager().testResetReadBufferManager();
+    
ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfig.getReadAheadBlockSize(),
 abfsConfig);
+    ReadBufferManagerV2 bufferManagerV2 = 
Mockito.spy(ReadBufferManagerV2.getBufferManager());
+    Mockito.doReturn(0.4).when(bufferManagerV2).getMemoryLoad();
+    // Add a failed buffer to completed queue and set to no free buffers to 
read ahead.
+    ReadBuffer buff = new ReadBuffer();
+    buff.setStatus(ReadBufferStatus.READ_FAILED);
+    buff.setStream(inputStream);
+    bufferManagerV2.testMimicFullUseAndAddFailedBuffer(buff);
+    
assertThat(bufferManagerV2.getNumBuffers()).isEqualTo(bufferManagerV2.getMinBufferPoolSize());
+    bufferManagerV2.queueReadAhead(inputStream, 0, ONE_KB,
+        inputStream.getTracingContext());
+    
assertThat(bufferManagerV2.getNumBuffers()).isEqualTo(bufferManagerV2.getMinBufferPoolSize()
 + 1);
+  }
+
+  @Test
+  public void testMemoryDownscaleIfMemoryAboveThreshold() throws Exception {

Review Comment:
   Similar tests can be added for Cpu Upscale downscale with threshold





> ABFS: [ReadAheadV2] Implement Read Buffer Manager V2 with improved 
> aggressiveness
> ---------------------------------------------------------------------------------
>
>                 Key: HADOOP-19622
>                 URL: https://issues.apache.org/jira/browse/HADOOP-19622
>             Project: Hadoop Common
>          Issue Type: Sub-task
>          Components: fs/azure
>    Affects Versions: 3.5.0, 3.4.1
>            Reporter: Anuj Modi
>            Assignee: Anuj Modi
>            Priority: Major
>              Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to