[
https://issues.apache.org/jira/browse/HADOOP-19622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18033432#comment-18033432
]
ASF GitHub Bot commented on HADOOP-19622:
-----------------------------------------
anujmodi2021 commented on code in PR #7832:
URL: https://github.com/apache/hadoop/pull/7832#discussion_r2467900691
##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestReadBufferManagerV2.java:
##########
@@ -0,0 +1,258 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
+import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
+import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
+import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus;
+
+import static
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_READAHEAD_V2;
+import static
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_READAHEAD_V2_DYNAMIC_SCALING;
+import static
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_READAHEAD_V2_CACHED_BUFFER_TTL_MILLIS;
+import static
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_READAHEAD_V2_CPU_MONITORING_INTERVAL_MILLIS;
+import static
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_READAHEAD_V2_CPU_USAGE_THRESHOLD_PERCENT;
+import static
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_READAHEAD_V2_MAX_THREAD_POOL_SIZE;
+import static
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_READAHEAD_V2_MEMORY_MONITORING_INTERVAL_MILLIS;
+import static
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_READAHEAD_V2_MIN_THREAD_POOL_SIZE;
+import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_KB;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Unit Tests around different components of Read Buffer Manager V2
+ */
+public class TestReadBufferManagerV2 extends AbstractAbfsIntegrationTest {
+ private volatile boolean running = true;
+ private final List<byte[]> allocations = new ArrayList<>();
+
+
+ public TestReadBufferManagerV2() throws Exception {
+ super();
+ }
+
+ /**
+ * Test to verify init of ReadBufferManagerV2
+ * @throws Exception if test fails
+ */
+ @Test
+ public void testReadBufferManagerV2Init() throws Exception {
+
ReadBufferManagerV2.setReadBufferManagerConfigs(getConfiguration().getReadAheadBlockSize(),
getConfiguration());
+ ReadBufferManagerV2.getBufferManager().testResetReadBufferManager();
+ assertThat(ReadBufferManagerV2.getInstance())
+ .as("ReadBufferManager should be uninitialized").isNull();
+ intercept(IllegalStateException.class, "ReadBufferManagerV2 is not
configured.", () -> {
+ ReadBufferManagerV2.getBufferManager();
+ });
+ // verify that multiple invocations of getBufferManager returns same
instance.
+
ReadBufferManagerV2.setReadBufferManagerConfigs(getConfiguration().getReadAheadBlockSize(),
getConfiguration());
+ ReadBufferManagerV2 bufferManager = ReadBufferManagerV2.getBufferManager();
+ ReadBufferManagerV2 bufferManager2 =
ReadBufferManagerV2.getBufferManager();
+ ReadBufferManagerV2 bufferManager3 = ReadBufferManagerV2.getInstance();
+ assertThat(bufferManager).isNotNull();
+ assertThat(bufferManager2).isNotNull();
+ assertThat(bufferManager).isSameAs(bufferManager2);
+ assertThat(bufferManager3).isNotNull();
+ assertThat(bufferManager3).isSameAs(bufferManager);
+
+ // Verify default values are not invalid.
+ assertThat(bufferManager.getMinBufferPoolSize()).isGreaterThan(0);
+ assertThat(bufferManager.getMaxBufferPoolSize()).isGreaterThan(0);
+ }
+
+ /**
+ * Test to verify that cpu monitor thread is not active if disabled.
+ * @throws Exception if test fails
+ */
+ @Test
+ public void testDynamicScalingSwitchingOnAndOff() throws Exception {
+ Configuration conf = new Configuration(getRawConfiguration());
+ conf.setBoolean(FS_AZURE_ENABLE_READAHEAD_V2, true);
+ conf.setBoolean(FS_AZURE_ENABLE_READAHEAD_V2_DYNAMIC_SCALING, true);
+ try(AzureBlobFileSystem fs = (AzureBlobFileSystem)
FileSystem.newInstance(getFileSystem().getUri(), conf)) {
+ AbfsConfiguration abfsConfiguration =
fs.getAbfsStore().getAbfsConfiguration();
+
ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfiguration.getReadAheadBlockSize(),
abfsConfiguration);
+ ReadBufferManagerV2 bufferManagerV2 =
ReadBufferManagerV2.getBufferManager();
+ assertThat(bufferManagerV2.getCpuMonitoringThread())
+ .as("CPU Monitor thread should be initialized").isNotNull();
+ bufferManagerV2.resetBufferManager();
+ }
+
+ conf.setBoolean(FS_AZURE_ENABLE_READAHEAD_V2_DYNAMIC_SCALING, false);
+ try(AzureBlobFileSystem fs = (AzureBlobFileSystem)
FileSystem.newInstance(getFileSystem().getUri(), conf)) {
+ AbfsConfiguration abfsConfiguration =
fs.getAbfsStore().getAbfsConfiguration();
+
ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfiguration.getReadAheadBlockSize(),
abfsConfiguration);
+ ReadBufferManagerV2 bufferManagerV2 =
ReadBufferManagerV2.getBufferManager();
+ assertThat(bufferManagerV2.getCpuMonitoringThread())
+ .as("CPU Monitor thread should not be initialized").isNull();
+ bufferManagerV2.resetBufferManager();
+ }
+ }
+
+ @Test
+ public void testThreadPoolDynamicScaling() throws Exception {
+ TestAbfsInputStream testAbfsInputStream = new TestAbfsInputStream();
Review Comment:
Nice catch
Taken
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java:
##########
@@ -106,123 +166,731 @@ void init() {
executorServiceKeepAliveTimeInMilliSec,
TimeUnit.MILLISECONDS,
new SynchronousQueue<>(),
- namedThreadFactory);
+ workerThreadFactory);
workerPool.allowCoreThreadTimeOut(true);
for (int i = 0; i < minThreadPoolSize; i++) {
- ReadBufferWorker worker = new ReadBufferWorker(i, this);
+ ReadBufferWorker worker = new ReadBufferWorker(i, getBufferManager());
+ workerRefs.add(worker);
workerPool.submit(worker);
}
ReadBufferWorker.UNLEASH_WORKERS.countDown();
+
+ if (isDynamicScalingEnabled) {
+ cpuMonitorThread = Executors.newSingleThreadScheduledExecutor(runnable
-> {
+ Thread t = new Thread(runnable, "ReadAheadV2-CPU-Monitor");
+ t.setDaemon(true);
+ return t;
+ });
+ cpuMonitorThread.scheduleAtFixedRate(this::adjustThreadPool,
+ getCpuMonitoringIntervalInMilliSec(),
getCpuMonitoringIntervalInMilliSec(),
+ TimeUnit.MILLISECONDS);
+ }
+
+ printTraceLog("ReadBufferManagerV2 initialized with {} buffers and {}
worker threads",
+ numberOfActiveBuffers, workerRefs.size());
}
/**
- * {@inheritDoc}
+ * {@link AbfsInputStream} calls this method to queueing read-ahead.
+ * @param stream which read-ahead is requested from.
+ * @param requestedOffset The offset in the file which should be read.
+ * @param requestedLength The length to read.
*/
@Override
- public void queueReadAhead(final AbfsInputStream stream,
- final long requestedOffset,
- final int requestedLength,
- final TracingContext tracingContext) {
- // TODO: To be implemented
+ public void queueReadAhead(final AbfsInputStream stream, final long
requestedOffset,
+ final int requestedLength, TracingContext tracingContext) {
+ printTraceLog("Start Queueing readAhead for file: {}, with eTag: {},
offset: {}, length: {}, triggered by stream: {}",
+ stream.getPath(), stream.getETag(), requestedOffset, requestedLength,
stream.hashCode());
+ ReadBuffer buffer;
+ synchronized (this) {
+ if (isAlreadyQueued(stream.getETag(), requestedOffset)) {
+ // Already queued for this offset, so skip queuing.
+ printTraceLog("Skipping queuing readAhead for file: {}, with eTag: {},
offset: {}, triggered by stream: {} as it is already queued",
+ stream.getPath(), stream.getETag(), requestedOffset,
stream.hashCode());
+ return;
+ }
+ if (isFreeListEmpty() && !tryMemoryUpscale() && !tryEvict()) {
+ // No buffers are available and more buffers cannot be created. Skip
queuing.
+ printTraceLog("Skipping queuing readAhead for file: {}, with eTag: {},
offset: {}, triggered by stream: {} as no buffers are available",
+ stream.getPath(), stream.getETag(), requestedOffset,
stream.hashCode());
+ return;
+ }
+
+ // Create a new ReadBuffer to keep the prefetched data and queue.
+ buffer = new ReadBuffer();
+ buffer.setStream(stream); // To map buffer with stream that requested it
+ buffer.setETag(stream.getETag()); // To map buffer with file it belongs
to
+ buffer.setPath(stream.getPath());
+ buffer.setOffset(requestedOffset);
+ buffer.setLength(0);
+ buffer.setRequestedLength(requestedLength);
+ buffer.setStatus(ReadBufferStatus.NOT_AVAILABLE);
+ buffer.setLatch(new CountDownLatch(1));
+ buffer.setTracingContext(tracingContext);
+
+ if (isFreeListEmpty()) {
+ /*
+ * By now there should be at least one buffer available.
+ * This is to double sure that after upscaling or eviction,
+ * we still have free buffer available. If not, we skip queueing.
+ */
+ return;
+ }
+ Integer bufferIndex = popFromFreeList();
+ buffer.setBuffer(bufferPool[bufferIndex]);
+ buffer.setBufferindex(bufferIndex);
+ getReadAheadQueue().add(buffer);
+ notifyAll();
+ printTraceLog("Done q-ing readAhead for file: {}, with eTag:{}, offset:
{}, buffer idx: {}, triggered by stream: {}",
+ stream.getPath(), stream.getETag(), requestedOffset,
buffer.getBufferindex(), stream.hashCode());
+ }
}
/**
- * {@inheritDoc}
+ * {@link AbfsInputStream} calls this method read any bytes already
available in a buffer (thereby saving a
+ * remote read). This returns the bytes if the data already exists in
buffer. If there is a buffer that is reading
+ * the requested offset, then this method blocks until that read completes.
If the data is queued in a read-ahead
+ * but not picked up by a worker thread yet, then it cancels that read-ahead
and reports cache miss. This is because
+ * depending on worker thread availability, the read-ahead may take a while
- the calling thread can do its own
+ * read to get the data faster (compared to the read waiting in queue for an
indeterminate amount of time).
+ *
+ * @param stream of the file to read bytes for
+ * @param position the offset in the file to do a read for
+ * @param length the length to read
+ * @param buffer the buffer to read data into. Note that the buffer will
be written into from offset 0.
+ * @return the number of bytes read
*/
@Override
- public int getBlock(final AbfsInputStream stream,
- final long position,
- final int length,
- final byte[] buffer) throws IOException {
- // TODO: To be implemented
+ public int getBlock(final AbfsInputStream stream, final long position, final
int length, final byte[] buffer)
+ throws IOException {
+ // not synchronized, so have to be careful with locking
+ printTraceLog("getBlock request for file: {}, with eTag: {}, for position:
{} for length: {} received from stream: {}",
+ stream.getPath(), stream.getETag(), position, length,
stream.hashCode());
+
+ String requestedETag = stream.getETag();
+ boolean isFirstRead = stream.isFirstRead();
+
+ // Wait for any in-progress read to complete.
+ waitForProcess(requestedETag, position, isFirstRead);
+
+ int bytesRead = 0;
+ synchronized (this) {
+ bytesRead = getBlockFromCompletedQueue(requestedETag, position, length,
buffer);
+ }
+ if (bytesRead > 0) {
+ printTraceLog("Done read from Cache for the file with eTag: {},
position: {}, length: {}, requested by stream: {}",
+ requestedETag, position, bytesRead, stream.hashCode());
+ return bytesRead;
+ }
+
+ // otherwise, just say we got nothing - calling thread can do its own read
return 0;
}
/**
- * {@inheritDoc}
+ * {@link ReadBufferWorker} thread calls this to get the next buffer that it
should work on.
+ * @return {@link ReadBuffer}
+ * @throws InterruptedException if thread is interrupted
*/
@Override
public ReadBuffer getNextBlockToRead() throws InterruptedException {
- // TODO: To be implemented
- return null;
+ ReadBuffer buffer = null;
+ synchronized (this) {
+ // Blocking Call to wait for prefetch to be queued.
+ while (getReadAheadQueue().size() == 0) {
+ wait();
+ }
+
+ buffer = getReadAheadQueue().remove();
+ notifyAll();
+ if (buffer == null) {
+ return null;
+ }
+ buffer.setStatus(ReadBufferStatus.READING_IN_PROGRESS);
+ getInProgressList().add(buffer);
+ }
+ printTraceLog("ReadBufferWorker picked file: {}, with eTag: {}, for
offset: {}, queued by stream: {}",
+ buffer.getPath(), buffer.getETag(), buffer.getOffset(),
buffer.getStream().hashCode());
+ return buffer;
}
/**
- * {@inheritDoc}
+ * {@link ReadBufferWorker} thread calls this method to post completion. *
+ * @param buffer the buffer whose read was completed
+ * @param result the {@link ReadBufferStatus} after the read
operation in the worker thread
+ * @param bytesActuallyRead the number of bytes that the worker thread was
actually able to read
*/
@Override
- public void doneReading(final ReadBuffer buffer,
- final ReadBufferStatus result,
+ public void doneReading(final ReadBuffer buffer, final ReadBufferStatus
result,
final int bytesActuallyRead) {
- // TODO: To be implemented
+ printTraceLog("ReadBufferWorker completed prefetch for file: {} with eTag:
{}, for offset: {}, queued by stream: {}, with status: {} and bytes read: {}",
+ buffer.getPath(), buffer.getETag(), buffer.getOffset(),
buffer.getStream().hashCode(), result, bytesActuallyRead);
+ synchronized (this) {
+ // If this buffer has already been purged during
+ // close of InputStream then we don't update the lists.
+ if (getInProgressList().contains(buffer)) {
+ getInProgressList().remove(buffer);
+ if (result == ReadBufferStatus.AVAILABLE && bytesActuallyRead > 0) {
+ // Successful read, so update the buffer status and length
+ buffer.setStatus(ReadBufferStatus.AVAILABLE);
+ buffer.setLength(bytesActuallyRead);
+ } else {
+ // Failed read, reuse buffer for next read, this buffer will be
+ // evicted later based on eviction policy.
+ pushToFreeList(buffer.getBufferindex());
+ }
+ // completed list also contains FAILED read buffers
+ // for sending exception message to clients.
+ buffer.setStatus(result);
+ buffer.setTimeStamp(currentTimeMillis());
+ getCompletedReadList().add(buffer);
+ }
+ }
+
+ //outside the synchronized, since anyone receiving a wake-up from the
latch must see safe-published results
+ buffer.getLatch().countDown(); // wake up waiting threads (if any)
}
/**
- * {@inheritDoc}
+ * Purging the buffers associated with an {@link AbfsInputStream}
+ * from {@link ReadBufferManagerV2} when stream is closed.
+ * @param stream input stream.
*/
- @Override
- public void purgeBuffersForStream(final AbfsInputStream stream) {
- // TODO: To be implemented
+ public synchronized void purgeBuffersForStream(AbfsInputStream stream) {
+ printDebugLog("Purging stale buffers for AbfsInputStream {} ", stream);
+ getReadAheadQueue().removeIf(readBuffer -> readBuffer.getStream() ==
stream);
+ purgeList(stream, getCompletedReadList());
+ }
+
+ private boolean isAlreadyQueued(final String eTag, final long
requestedOffset) {
+ // returns true if any part of the buffer is already queued
+ return (isInList(getReadAheadQueue(), eTag, requestedOffset)
+ || isInList(getInProgressList(), eTag, requestedOffset)
+ || isInList(getCompletedReadList(), eTag, requestedOffset));
+ }
+
+ private boolean isInList(final Collection<ReadBuffer> list, final String
eTag,
Review Comment:
Added
> ABFS: [ReadAheadV2] Implement Read Buffer Manager V2 with improved
> aggressiveness
> ---------------------------------------------------------------------------------
>
> Key: HADOOP-19622
> URL: https://issues.apache.org/jira/browse/HADOOP-19622
> Project: Hadoop Common
> Issue Type: Sub-task
> Components: fs/azure
> Affects Versions: 3.5.0, 3.4.1
> Reporter: Anuj Modi
> Assignee: Anuj Modi
> Priority: Major
> Labels: pull-request-available
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]