anujmodi2021 commented on code in PR #7832: URL: https://github.com/apache/hadoop/pull/7832#discussion_r2476640605
########## hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestReadBufferManagerV2.java: ########## @@ -0,0 +1,258 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs.azurebfs.services; + +import java.util.ArrayList; +import java.util.List; + +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.azurebfs.AbfsConfiguration; +import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest; +import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem; +import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus; + +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_READAHEAD_V2; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_READAHEAD_V2_DYNAMIC_SCALING; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_READAHEAD_V2_CACHED_BUFFER_TTL_MILLIS; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_READAHEAD_V2_CPU_MONITORING_INTERVAL_MILLIS; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_READAHEAD_V2_CPU_USAGE_THRESHOLD_PERCENT; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_READAHEAD_V2_MAX_THREAD_POOL_SIZE; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_READAHEAD_V2_MEMORY_MONITORING_INTERVAL_MILLIS; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_READAHEAD_V2_MIN_THREAD_POOL_SIZE; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_KB; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Unit Tests around different components of Read Buffer Manager V2 + */ +public class TestReadBufferManagerV2 extends AbstractAbfsIntegrationTest { + private volatile boolean running = true; + private final List<byte[]> allocations = new ArrayList<>(); + + + public TestReadBufferManagerV2() throws Exception { + super(); + } + + /** + * Test to verify init of ReadBufferManagerV2 + * @throws Exception if test fails + */ + @Test + public void testReadBufferManagerV2Init() throws Exception { + ReadBufferManagerV2.setReadBufferManagerConfigs(getConfiguration().getReadAheadBlockSize(), getConfiguration()); + ReadBufferManagerV2.getBufferManager().testResetReadBufferManager(); + assertThat(ReadBufferManagerV2.getInstance()) + .as("ReadBufferManager should be uninitialized").isNull(); + intercept(IllegalStateException.class, "ReadBufferManagerV2 is not configured.", () -> { + ReadBufferManagerV2.getBufferManager(); + }); + // verify that multiple invocations of getBufferManager returns same instance. + ReadBufferManagerV2.setReadBufferManagerConfigs(getConfiguration().getReadAheadBlockSize(), getConfiguration()); + ReadBufferManagerV2 bufferManager = ReadBufferManagerV2.getBufferManager(); + ReadBufferManagerV2 bufferManager2 = ReadBufferManagerV2.getBufferManager(); + ReadBufferManagerV2 bufferManager3 = ReadBufferManagerV2.getInstance(); + assertThat(bufferManager).isNotNull(); + assertThat(bufferManager2).isNotNull(); + assertThat(bufferManager).isSameAs(bufferManager2); + assertThat(bufferManager3).isNotNull(); + assertThat(bufferManager3).isSameAs(bufferManager); + + // Verify default values are not invalid. + assertThat(bufferManager.getMinBufferPoolSize()).isGreaterThan(0); + assertThat(bufferManager.getMaxBufferPoolSize()).isGreaterThan(0); + } + + /** + * Test to verify that cpu monitor thread is not active if disabled. + * @throws Exception if test fails + */ + @Test + public void testDynamicScalingSwitchingOnAndOff() throws Exception { + Configuration conf = new Configuration(getRawConfiguration()); + conf.setBoolean(FS_AZURE_ENABLE_READAHEAD_V2, true); + conf.setBoolean(FS_AZURE_ENABLE_READAHEAD_V2_DYNAMIC_SCALING, true); + try(AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance(getFileSystem().getUri(), conf)) { + AbfsConfiguration abfsConfiguration = fs.getAbfsStore().getAbfsConfiguration(); + ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfiguration.getReadAheadBlockSize(), abfsConfiguration); + ReadBufferManagerV2 bufferManagerV2 = ReadBufferManagerV2.getBufferManager(); + assertThat(bufferManagerV2.getCpuMonitoringThread()) + .as("CPU Monitor thread should be initialized").isNotNull(); + bufferManagerV2.resetBufferManager(); + } + + conf.setBoolean(FS_AZURE_ENABLE_READAHEAD_V2_DYNAMIC_SCALING, false); + try(AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance(getFileSystem().getUri(), conf)) { + AbfsConfiguration abfsConfiguration = fs.getAbfsStore().getAbfsConfiguration(); + ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfiguration.getReadAheadBlockSize(), abfsConfiguration); + ReadBufferManagerV2 bufferManagerV2 = ReadBufferManagerV2.getBufferManager(); + assertThat(bufferManagerV2.getCpuMonitoringThread()) + .as("CPU Monitor thread should not be initialized").isNull(); + bufferManagerV2.resetBufferManager(); + } + } + + @Test + public void testThreadPoolDynamicScaling() throws Exception { + TestAbfsInputStream testAbfsInputStream = new TestAbfsInputStream(); + AbfsClient client = testAbfsInputStream.getMockAbfsClient(); + AbfsInputStream inputStream = testAbfsInputStream.getAbfsInputStream(client, "testFailedReadAhead.txt"); + Configuration configuration = getReabAheadV2Configuration(); + AbfsConfiguration abfsConfig = new AbfsConfiguration(configuration, + getAccountName()); + ReadBufferManagerV2.getBufferManager().testResetReadBufferManager(); + ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfig.getReadAheadBlockSize(), abfsConfig); + ReadBufferManagerV2 bufferManagerV2 = ReadBufferManagerV2.getBufferManager(); + assertThat(bufferManagerV2.getCurrentThreadPoolSize()).isEqualTo(2); + int[] reqOffset = {0}; + int reqLength = 1; + Thread t = new Thread(() -> { + while (running) { + bufferManagerV2.queueReadAhead(inputStream, reqOffset[0], reqLength, + inputStream.getTracingContext()); + reqOffset[0] += reqLength; + } + }); + t.start(); + Thread.sleep(2L * bufferManagerV2.getCpuMonitoringIntervalInMilliSec()); + assertThat(bufferManagerV2.getCurrentThreadPoolSize()).isEqualTo(4); + running = false; + t.join(); + Thread.sleep(4L * bufferManagerV2.getCpuMonitoringIntervalInMilliSec()); + assertThat(bufferManagerV2.getCurrentThreadPoolSize()).isLessThan(4); + } + + @Test + public void testScheduledEviction() throws Exception { + TestAbfsInputStream testAbfsInputStream = new TestAbfsInputStream(); + AbfsClient client = testAbfsInputStream.getMockAbfsClient(); + AbfsInputStream inputStream = testAbfsInputStream.getAbfsInputStream(client, "testFailedReadAhead.txt"); + Configuration configuration = getReabAheadV2Configuration(); + AbfsConfiguration abfsConfig = new AbfsConfiguration(configuration, + getAccountName()); + ReadBufferManagerV2.getBufferManager().testResetReadBufferManager(); + ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfig.getReadAheadBlockSize(), abfsConfig); + ReadBufferManagerV2 bufferManagerV2 = ReadBufferManagerV2.getBufferManager(); + // Add a failed buffer to completed queue and set to no free buffers to read ahead. + ReadBuffer buff = new ReadBuffer(); + buff.setStatus(ReadBufferStatus.READ_FAILED); + buff.setStream(inputStream); + bufferManagerV2.testMimicFullUseAndAddFailedBuffer(buff); + bufferManagerV2.testMimicFullUseAndAddFailedBuffer(buff); + assertThat(bufferManagerV2.getCompletedReadListSize()).isEqualTo(2); + Thread.sleep(2L * bufferManagerV2.getMemoryMonitoringIntervalInMilliSec()); + assertThat(bufferManagerV2.getCompletedReadListSize()).isEqualTo(0); + } + + @Test + public void testMemoryUpscaleNotAllowedIfMemoryAboveThreshold() throws Exception { + TestAbfsInputStream testAbfsInputStream = new TestAbfsInputStream(); + AbfsClient client = testAbfsInputStream.getMockAbfsClient(); + AbfsInputStream inputStream = testAbfsInputStream.getAbfsInputStream(client, "testFailedReadAhead.txt"); + Configuration configuration = getReabAheadV2Configuration(); + AbfsConfiguration abfsConfig = new AbfsConfiguration(configuration, + getAccountName()); + ReadBufferManagerV2.getBufferManager().testResetReadBufferManager(); + ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfig.getReadAheadBlockSize(), abfsConfig); + ReadBufferManagerV2 bufferManagerV2 = Mockito.spy(ReadBufferManagerV2.getBufferManager()); + Mockito.doReturn(0.6).when(bufferManagerV2).getMemoryLoad(); + // Add a failed buffer to completed queue and set to no free buffers to read ahead. + ReadBuffer buff = new ReadBuffer(); + buff.setStatus(ReadBufferStatus.READ_FAILED); + buff.setStream(inputStream); + bufferManagerV2.testMimicFullUseAndAddFailedBuffer(buff); + assertThat(bufferManagerV2.getNumBuffers()).isEqualTo(bufferManagerV2.getMinBufferPoolSize()); + bufferManagerV2.queueReadAhead(inputStream, 0, ONE_KB, + inputStream.getTracingContext()); + assertThat(bufferManagerV2.getNumBuffers()).isEqualTo(bufferManagerV2.getMinBufferPoolSize()); + } + + @Test + public void testMemoryUpscaleIfMemoryBelowThreshold() throws Exception { + TestAbfsInputStream testAbfsInputStream = new TestAbfsInputStream(); + AbfsClient client = testAbfsInputStream.getMockAbfsClient(); + AbfsInputStream inputStream = testAbfsInputStream.getAbfsInputStream(client, "testFailedReadAhead.txt"); + Configuration configuration = getReabAheadV2Configuration(); + AbfsConfiguration abfsConfig = new AbfsConfiguration(configuration, + getAccountName()); + ReadBufferManagerV2.getBufferManager().testResetReadBufferManager(); + ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfig.getReadAheadBlockSize(), abfsConfig); + ReadBufferManagerV2 bufferManagerV2 = Mockito.spy(ReadBufferManagerV2.getBufferManager()); + Mockito.doReturn(0.4).when(bufferManagerV2).getMemoryLoad(); + // Add a failed buffer to completed queue and set to no free buffers to read ahead. + ReadBuffer buff = new ReadBuffer(); + buff.setStatus(ReadBufferStatus.READ_FAILED); + buff.setStream(inputStream); + bufferManagerV2.testMimicFullUseAndAddFailedBuffer(buff); + assertThat(bufferManagerV2.getNumBuffers()).isEqualTo(bufferManagerV2.getMinBufferPoolSize()); + bufferManagerV2.queueReadAhead(inputStream, 0, ONE_KB, + inputStream.getTracingContext()); + assertThat(bufferManagerV2.getNumBuffers()).isEqualTo(bufferManagerV2.getMinBufferPoolSize() + 1); + } + + @Test + public void testMemoryDownscaleIfMemoryAboveThreshold() throws Exception { Review Comment: Added -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
