[
https://issues.apache.org/jira/browse/HADOOP-19622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18033438#comment-18033438
]
ASF GitHub Bot commented on HADOOP-19622:
-----------------------------------------
anujmodi2021 commented on code in PR #7832:
URL: https://github.com/apache/hadoop/pull/7832#discussion_r2467911950
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java:
##########
@@ -106,123 +166,731 @@ void init() {
executorServiceKeepAliveTimeInMilliSec,
TimeUnit.MILLISECONDS,
new SynchronousQueue<>(),
- namedThreadFactory);
+ workerThreadFactory);
workerPool.allowCoreThreadTimeOut(true);
for (int i = 0; i < minThreadPoolSize; i++) {
- ReadBufferWorker worker = new ReadBufferWorker(i, this);
+ ReadBufferWorker worker = new ReadBufferWorker(i, getBufferManager());
+ workerRefs.add(worker);
workerPool.submit(worker);
}
ReadBufferWorker.UNLEASH_WORKERS.countDown();
+
+ if (isDynamicScalingEnabled) {
+ cpuMonitorThread = Executors.newSingleThreadScheduledExecutor(runnable
-> {
+ Thread t = new Thread(runnable, "ReadAheadV2-CPU-Monitor");
+ t.setDaemon(true);
+ return t;
+ });
+ cpuMonitorThread.scheduleAtFixedRate(this::adjustThreadPool,
+ getCpuMonitoringIntervalInMilliSec(),
getCpuMonitoringIntervalInMilliSec(),
+ TimeUnit.MILLISECONDS);
+ }
+
+ printTraceLog("ReadBufferManagerV2 initialized with {} buffers and {}
worker threads",
+ numberOfActiveBuffers, workerRefs.size());
}
/**
- * {@inheritDoc}
+ * {@link AbfsInputStream} calls this method to queueing read-ahead.
+ * @param stream which read-ahead is requested from.
+ * @param requestedOffset The offset in the file which should be read.
+ * @param requestedLength The length to read.
*/
@Override
- public void queueReadAhead(final AbfsInputStream stream,
- final long requestedOffset,
- final int requestedLength,
- final TracingContext tracingContext) {
- // TODO: To be implemented
+ public void queueReadAhead(final AbfsInputStream stream, final long
requestedOffset,
+ final int requestedLength, TracingContext tracingContext) {
+ printTraceLog("Start Queueing readAhead for file: {}, with eTag: {},
offset: {}, length: {}, triggered by stream: {}",
+ stream.getPath(), stream.getETag(), requestedOffset, requestedLength,
stream.hashCode());
+ ReadBuffer buffer;
+ synchronized (this) {
+ if (isAlreadyQueued(stream.getETag(), requestedOffset)) {
+ // Already queued for this offset, so skip queuing.
+ printTraceLog("Skipping queuing readAhead for file: {}, with eTag: {},
offset: {}, triggered by stream: {} as it is already queued",
+ stream.getPath(), stream.getETag(), requestedOffset,
stream.hashCode());
+ return;
+ }
+ if (isFreeListEmpty() && !tryMemoryUpscale() && !tryEvict()) {
+ // No buffers are available and more buffers cannot be created. Skip
queuing.
+ printTraceLog("Skipping queuing readAhead for file: {}, with eTag: {},
offset: {}, triggered by stream: {} as no buffers are available",
+ stream.getPath(), stream.getETag(), requestedOffset,
stream.hashCode());
+ return;
+ }
+
+ // Create a new ReadBuffer to keep the prefetched data and queue.
+ buffer = new ReadBuffer();
+ buffer.setStream(stream); // To map buffer with stream that requested it
+ buffer.setETag(stream.getETag()); // To map buffer with file it belongs
to
+ buffer.setPath(stream.getPath());
+ buffer.setOffset(requestedOffset);
+ buffer.setLength(0);
+ buffer.setRequestedLength(requestedLength);
+ buffer.setStatus(ReadBufferStatus.NOT_AVAILABLE);
+ buffer.setLatch(new CountDownLatch(1));
+ buffer.setTracingContext(tracingContext);
+
+ if (isFreeListEmpty()) {
+ /*
+ * By now there should be at least one buffer available.
+ * This is to double sure that after upscaling or eviction,
+ * we still have free buffer available. If not, we skip queueing.
+ */
+ return;
+ }
+ Integer bufferIndex = popFromFreeList();
+ buffer.setBuffer(bufferPool[bufferIndex]);
+ buffer.setBufferindex(bufferIndex);
+ getReadAheadQueue().add(buffer);
+ notifyAll();
+ printTraceLog("Done q-ing readAhead for file: {}, with eTag:{}, offset:
{}, buffer idx: {}, triggered by stream: {}",
+ stream.getPath(), stream.getETag(), requestedOffset,
buffer.getBufferindex(), stream.hashCode());
+ }
}
/**
- * {@inheritDoc}
+ * {@link AbfsInputStream} calls this method read any bytes already
available in a buffer (thereby saving a
+ * remote read). This returns the bytes if the data already exists in
buffer. If there is a buffer that is reading
+ * the requested offset, then this method blocks until that read completes.
If the data is queued in a read-ahead
+ * but not picked up by a worker thread yet, then it cancels that read-ahead
and reports cache miss. This is because
+ * depending on worker thread availability, the read-ahead may take a while
- the calling thread can do its own
+ * read to get the data faster (compared to the read waiting in queue for an
indeterminate amount of time).
+ *
+ * @param stream of the file to read bytes for
+ * @param position the offset in the file to do a read for
+ * @param length the length to read
+ * @param buffer the buffer to read data into. Note that the buffer will
be written into from offset 0.
+ * @return the number of bytes read
*/
@Override
- public int getBlock(final AbfsInputStream stream,
- final long position,
- final int length,
- final byte[] buffer) throws IOException {
- // TODO: To be implemented
+ public int getBlock(final AbfsInputStream stream, final long position, final
int length, final byte[] buffer)
+ throws IOException {
+ // not synchronized, so have to be careful with locking
+ printTraceLog("getBlock request for file: {}, with eTag: {}, for position:
{} for length: {} received from stream: {}",
+ stream.getPath(), stream.getETag(), position, length,
stream.hashCode());
+
+ String requestedETag = stream.getETag();
+ boolean isFirstRead = stream.isFirstRead();
+
+ // Wait for any in-progress read to complete.
+ waitForProcess(requestedETag, position, isFirstRead);
+
+ int bytesRead = 0;
+ synchronized (this) {
+ bytesRead = getBlockFromCompletedQueue(requestedETag, position, length,
buffer);
+ }
+ if (bytesRead > 0) {
+ printTraceLog("Done read from Cache for the file with eTag: {},
position: {}, length: {}, requested by stream: {}",
+ requestedETag, position, bytesRead, stream.hashCode());
+ return bytesRead;
+ }
+
+ // otherwise, just say we got nothing - calling thread can do its own read
return 0;
}
/**
- * {@inheritDoc}
+ * {@link ReadBufferWorker} thread calls this to get the next buffer that it
should work on.
+ * @return {@link ReadBuffer}
+ * @throws InterruptedException if thread is interrupted
*/
@Override
public ReadBuffer getNextBlockToRead() throws InterruptedException {
- // TODO: To be implemented
- return null;
+ ReadBuffer buffer = null;
+ synchronized (this) {
+ // Blocking Call to wait for prefetch to be queued.
+ while (getReadAheadQueue().size() == 0) {
+ wait();
+ }
+
+ buffer = getReadAheadQueue().remove();
+ notifyAll();
+ if (buffer == null) {
+ return null;
+ }
+ buffer.setStatus(ReadBufferStatus.READING_IN_PROGRESS);
+ getInProgressList().add(buffer);
+ }
+ printTraceLog("ReadBufferWorker picked file: {}, with eTag: {}, for
offset: {}, queued by stream: {}",
+ buffer.getPath(), buffer.getETag(), buffer.getOffset(),
buffer.getStream().hashCode());
+ return buffer;
}
/**
- * {@inheritDoc}
+ * {@link ReadBufferWorker} thread calls this method to post completion. *
+ * @param buffer the buffer whose read was completed
+ * @param result the {@link ReadBufferStatus} after the read
operation in the worker thread
+ * @param bytesActuallyRead the number of bytes that the worker thread was
actually able to read
*/
@Override
- public void doneReading(final ReadBuffer buffer,
- final ReadBufferStatus result,
+ public void doneReading(final ReadBuffer buffer, final ReadBufferStatus
result,
final int bytesActuallyRead) {
- // TODO: To be implemented
+ printTraceLog("ReadBufferWorker completed prefetch for file: {} with eTag:
{}, for offset: {}, queued by stream: {}, with status: {} and bytes read: {}",
+ buffer.getPath(), buffer.getETag(), buffer.getOffset(),
buffer.getStream().hashCode(), result, bytesActuallyRead);
+ synchronized (this) {
+ // If this buffer has already been purged during
+ // close of InputStream then we don't update the lists.
+ if (getInProgressList().contains(buffer)) {
+ getInProgressList().remove(buffer);
+ if (result == ReadBufferStatus.AVAILABLE && bytesActuallyRead > 0) {
+ // Successful read, so update the buffer status and length
+ buffer.setStatus(ReadBufferStatus.AVAILABLE);
+ buffer.setLength(bytesActuallyRead);
+ } else {
+ // Failed read, reuse buffer for next read, this buffer will be
+ // evicted later based on eviction policy.
+ pushToFreeList(buffer.getBufferindex());
+ }
+ // completed list also contains FAILED read buffers
+ // for sending exception message to clients.
+ buffer.setStatus(result);
+ buffer.setTimeStamp(currentTimeMillis());
+ getCompletedReadList().add(buffer);
+ }
+ }
+
+ //outside the synchronized, since anyone receiving a wake-up from the
latch must see safe-published results
+ buffer.getLatch().countDown(); // wake up waiting threads (if any)
}
/**
- * {@inheritDoc}
+ * Purging the buffers associated with an {@link AbfsInputStream}
+ * from {@link ReadBufferManagerV2} when stream is closed.
+ * @param stream input stream.
*/
- @Override
- public void purgeBuffersForStream(final AbfsInputStream stream) {
- // TODO: To be implemented
+ public synchronized void purgeBuffersForStream(AbfsInputStream stream) {
+ printDebugLog("Purging stale buffers for AbfsInputStream {} ", stream);
+ getReadAheadQueue().removeIf(readBuffer -> readBuffer.getStream() ==
stream);
+ purgeList(stream, getCompletedReadList());
+ }
+
+ private boolean isAlreadyQueued(final String eTag, final long
requestedOffset) {
+ // returns true if any part of the buffer is already queued
+ return (isInList(getReadAheadQueue(), eTag, requestedOffset)
+ || isInList(getInProgressList(), eTag, requestedOffset)
+ || isInList(getCompletedReadList(), eTag, requestedOffset));
+ }
+
+ private boolean isInList(final Collection<ReadBuffer> list, final String
eTag,
+ final long requestedOffset) {
+ return (getFromList(list, eTag, requestedOffset) != null);
+ }
+
+ private ReadBuffer getFromList(final Collection<ReadBuffer> list, final
String eTag,
+ final long requestedOffset) {
+ for (ReadBuffer buffer : list) {
+ if (eTag.equals(buffer.getETag())) {
+ if (buffer.getStatus() == ReadBufferStatus.AVAILABLE
+ && requestedOffset >= buffer.getOffset()
+ && requestedOffset < buffer.getOffset() + buffer.getLength()) {
+ return buffer;
+ } else if (requestedOffset >= buffer.getOffset()
+ && requestedOffset
+ < buffer.getOffset() + buffer.getRequestedLength()) {
+ return buffer;
+ }
+ }
+ }
+ return null;
}
/**
- * {@inheritDoc}
+ * If any buffer in the completed list can be reclaimed then reclaim it and
return the buffer to free list.
+ * The objective is to find just one buffer - there is no advantage to
evicting more than one.
+ * @return whether the eviction succeeded - i.e., were we able to free up
one buffer
*/
- @VisibleForTesting
- @Override
- public int getNumBuffers() {
- return numberOfActiveBuffers;
+ private synchronized boolean tryEvict() {
+ ReadBuffer nodeToEvict = null;
+ if (getCompletedReadList().size() <= 0) {
+ return false; // there are no evict-able buffers
+ }
+
+ long currentTimeInMs = currentTimeMillis();
+
+ // first, try buffers where all bytes have been consumed (approximated as
first and last bytes consumed)
+ for (ReadBuffer buf : getCompletedReadList()) {
+ if (buf.isFullyConsumed()) {
+ nodeToEvict = buf;
+ break;
+ }
+ }
+ if (nodeToEvict != null) {
+ return manualEviction(nodeToEvict);
+ }
+
+ // next, try buffers where any bytes have been consumed (maybe a bad idea?
have to experiment and see)
+ for (ReadBuffer buf : getCompletedReadList()) {
+ if (buf.isAnyByteConsumed()) {
+ nodeToEvict = buf;
+ break;
+ }
+ }
+
+ if (nodeToEvict != null) {
+ return manualEviction(nodeToEvict);
+ }
+
+ // next, try any old nodes that have not been consumed
+ // Failed read buffers (with buffer index=-1) that are older than
+ // thresholdAge should be cleaned up, but at the same time should not
+ // report successful eviction.
+ // Queue logic expects that a buffer is freed up for read ahead when
+ // eviction is successful, whereas a failed ReadBuffer would have released
+ // its buffer when its status was set to READ_FAILED.
+ long earliestBirthday = Long.MAX_VALUE;
+ ArrayList<ReadBuffer> oldFailedBuffers = new ArrayList<>();
+ for (ReadBuffer buf : getCompletedReadList()) {
+ if ((buf.getBufferindex() != -1)
+ && (buf.getTimeStamp() < earliestBirthday)) {
+ nodeToEvict = buf;
+ earliestBirthday = buf.getTimeStamp();
+ } else if ((buf.getBufferindex() == -1)
+ && (currentTimeInMs - buf.getTimeStamp()) >
getThresholdAgeMilliseconds()) {
+ oldFailedBuffers.add(buf);
+ }
+ }
+
+ for (ReadBuffer buf : oldFailedBuffers) {
+ manualEviction(buf);
+ }
+
+ if ((currentTimeInMs - earliestBirthday > getThresholdAgeMilliseconds())
&& (nodeToEvict != null)) {
+ return manualEviction(nodeToEvict);
+ }
+
+ printTraceLog("No buffer eligible for eviction");
+ // nothing can be evicted
+ return false;
+ }
+
+ private boolean evict(final ReadBuffer buf) {
+ if (buf.getRefCount() > 0) {
+ // If the buffer is still being read, then we cannot evict it.
+ printTraceLog(
+ "Cannot evict buffer with index: {}, file: {}, with eTag: {},
offset: {} as it is still being read by some input stream",
+ buf.getBufferindex(), buf.getPath(), buf.getETag(), buf.getOffset());
+ return false;
+ }
+ // As failed ReadBuffers (bufferIndx = -1) are saved in
getCompletedReadList(),
+ // avoid adding it to availableBufferList.
+ if (buf.getBufferindex() != -1) {
+ pushToFreeList(buf.getBufferindex());
+ }
+ getCompletedReadList().remove(buf);
+ buf.setTracingContext(null);
+ printTraceLog(
+ "Eviction of Buffer Completed for BufferIndex: {}, file: {}, with
eTag: {}, offset: {}, is fully consumed: {}, is partially consumed: {}",
+ buf.getBufferindex(), buf.getPath(), buf.getETag(), buf.getOffset(),
+ buf.isFullyConsumed(), buf.isAnyByteConsumed());
+ return true;
+ }
+
+ private void waitForProcess(final String eTag, final long position, boolean
isFirstRead) {
+ ReadBuffer readBuf;
+ synchronized (this) {
+ readBuf = clearFromReadAheadQueue(eTag, position, isFirstRead);
+ if (readBuf == null) {
+ readBuf = getFromList(getInProgressList(), eTag, position);
+ }
+ }
+ if (readBuf != null) { // if in in-progress queue, then block for
it
+ try {
+ printTraceLog("A relevant read buffer for file: {}, with eTag: {},
offset: {}, queued by stream: {}, having buffer idx: {} is being prefetched,
waiting for latch",
+ readBuf.getPath(), readBuf.getETag(), readBuf.getOffset(),
readBuf.getStream().hashCode(), readBuf.getBufferindex());
+ readBuf.getLatch().await(); // blocking wait on the caller stream's
thread
+ // Note on correctness: readBuf gets out of getInProgressList() only
in 1 place: after worker thread
+ // is done processing it (in doneReading). There, the latch is set
after removing the buffer from
+ // getInProgressList(). So this latch is safe to be outside the
synchronized block.
+ // Putting it in synchronized would result in a deadlock, since this
thread would be holding the lock
+ // while waiting, so no one will be able to change any state. If this
becomes more complex in the future,
+ // then the latch cane be removed and replaced with wait/notify
whenever getInProgressList() is touched.
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ }
+ printTraceLog("Latch done for file: {}, with eTag: {}, for offset: {}, "
+ + "buffer index: {} queued by stream: {}", readBuf.getPath(),
readBuf.getETag(),
+ readBuf.getOffset(), readBuf.getBufferindex(),
readBuf.getStream().hashCode());
+ }
+ }
+
+ private ReadBuffer clearFromReadAheadQueue(final String eTag, final long
requestedOffset, boolean isFirstRead) {
+ ReadBuffer buffer = getFromList(getReadAheadQueue(), eTag,
requestedOffset);
+ /*
+ * If this prefetch was triggered by first read of this input stream,
+ * we should not remove it from queue and let it complete by backend
threads.
+ */
+ if (buffer != null && isFirstRead) {
+ return buffer;
+ }
+ if (buffer != null) {
+ getReadAheadQueue().remove(buffer);
+ notifyAll(); // lock is held in calling method
+ pushToFreeList(buffer.getBufferindex());
+ }
+ return null;
}
+
+ private int getBlockFromCompletedQueue(final String eTag, final long
position,
+ final int length, final byte[] buffer) throws IOException {
+ ReadBuffer buf = getBufferFromCompletedQueue(eTag, position);
+
+ if (buf == null) {
+ return 0;
+ }
+
+ buf.startReading(); // atomic increment of refCount.
+
+ if (buf.getStatus() == ReadBufferStatus.READ_FAILED) {
+ // To prevent new read requests to fail due to old read-ahead attempts,
+ // return exception only from buffers that failed within last
getThresholdAgeMilliseconds()
+ if ((currentTimeMillis() - (buf.getTimeStamp()) <
getThresholdAgeMilliseconds())) {
+ throw buf.getErrException();
+ } else {
+ return 0;
+ }
+ }
+
+ if ((buf.getStatus() != ReadBufferStatus.AVAILABLE)
+ || (position >= buf.getOffset() + buf.getLength())) {
+ return 0;
+ }
+
+ int cursor = (int) (position - buf.getOffset());
+ int availableLengthInBuffer = buf.getLength() - cursor;
+ int lengthToCopy = Math.min(length, availableLengthInBuffer);
+ System.arraycopy(buf.getBuffer(), cursor, buffer, 0, lengthToCopy);
+ if (cursor == 0) {
+ buf.setFirstByteConsumed(true);
+ }
+ if (cursor + lengthToCopy == buf.getLength()) {
+ buf.setLastByteConsumed(true);
+ }
+ buf.setAnyByteConsumed(true);
+
+ buf.endReading(); // atomic decrement of refCount
+ return lengthToCopy;
+ }
+
+ private ReadBuffer getBufferFromCompletedQueue(final String eTag, final long
requestedOffset) {
+ for (ReadBuffer buffer : getCompletedReadList()) {
+ // Buffer is returned if the requestedOffset is at or above buffer's
+ // offset but less than buffer's length or the actual requestedLength
+ if (eTag.equals(buffer.getETag())
+ && (requestedOffset >= buffer.getOffset())
+ && ((requestedOffset < buffer.getOffset() + buffer.getLength())
+ || (requestedOffset < buffer.getOffset() +
buffer.getRequestedLength()))) {
+ return buffer;
+ }
+ }
+ return null;
+ }
+
+ private synchronized boolean tryMemoryUpscale() {
+ if (!isDynamicScalingEnabled) {
+ printTraceLog("Dynamic scaling is disabled, skipping memory upscale");
+ return false; // Dynamic scaling is disabled, so no upscaling.
+ }
+ double memoryLoad = getMemoryLoad();
+ if (memoryLoad < memoryThreshold && getNumBuffers() < maxBufferPoolSize) {
+ // Create and Add more buffers in getFreeList().
+ if (removedBufferList.isEmpty()) {
+ bufferPool[getNumBuffers()] = new byte[getReadAheadBlockSize()];
+ pushToFreeList(getNumBuffers());
+ } else {
+ // Reuse a removed buffer index.
+ int freeIndex = removedBufferList.pop();
+ if (freeIndex >= bufferPool.length) {
+ printTraceLog("Invalid free index: {}. Current buffer pool size: {}",
+ freeIndex, bufferPool.length);
+ return false;
+ }
+ bufferPool[freeIndex] = new byte[getReadAheadBlockSize()];
+ pushToFreeList(freeIndex);
+ }
+ incrementActiveBufferCount();
+ printTraceLog("Current Memory Load: {}. Incrementing buffer pool size to
{}", memoryLoad, getNumBuffers());
+ return true;
+ }
+ printTraceLog("Could not Upscale memory. Total buffers: {} Memory Load:
{}",
+ getNumBuffers(), memoryLoad);
+ return false;
+ }
+
+ private void scheduledEviction() {
+ for (ReadBuffer buf : getCompletedReadList()) {
+ if (currentTimeMillis() - buf.getTimeStamp() >
getThresholdAgeMilliseconds()) {
+ // If the buffer is older than thresholdAge, evict it.
+ printTraceLog("Scheduled Eviction of Buffer Triggered for BufferIndex:
{}, file: {}, with eTag: {}, offset: {}, length: {}, queued by stream: {}",
+ buf.getBufferindex(), buf.getPath(), buf.getETag(),
buf.getOffset(), buf.getLength(), buf.getStream().hashCode());
+ evict(buf);
+ }
+ }
+
+ double memoryLoad = getMemoryLoad();
+ if (isDynamicScalingEnabled && memoryLoad > memoryThreshold) {
+ synchronized (this) {
+ if (isFreeListEmpty()) {
+ printTraceLog("No free buffers available. Skipping downscale of
buffer pool");
+ return; // No free buffers available, so cannot downscale.
+ }
+ int freeIndex = popFromFreeList();
+ bufferPool[freeIndex] = null;
+ removedBufferList.add(freeIndex);
+ decrementActiveBufferCount();
+ printTraceLog("Current Memory Load: {}. Decrementing buffer pool size
to {}", memoryLoad, getNumBuffers());
+ }
+ }
+ }
+
+ private boolean manualEviction(final ReadBuffer buf) {
+ printTraceLog("Manual Eviction of Buffer Triggered for BufferIndex: {},
file: {}, with eTag: {}, offset: {}, queued by stream: {}",
+ buf.getBufferindex(), buf.getPath(), buf.getETag(), buf.getOffset(),
buf.getStream().hashCode());
+ return evict(buf);
+ }
+
+ private void adjustThreadPool() {
+ int currentPoolSize = workerRefs.size();
+ double cpuLoad = getCpuLoad();
+ int requiredPoolSize = getRequiredThreadPoolSize();
+ int newThreadPoolSize;
+ printTraceLog("Current CPU load: {}, Current worker pool size: {}, Current
queue size: {}", cpuLoad, currentPoolSize, requiredPoolSize);
+ if (currentPoolSize < requiredPoolSize && cpuLoad < cpuThreshold) {
+ // Submit more background tasks.
+ newThreadPoolSize = Math.min(maxThreadPoolSize,
+ (int) Math.ceil((currentPoolSize * (ONE_HUNDRED +
threadPoolUpscalePercentage))/ONE_HUNDRED));
+ // Create new Worker Threads
+ for (int i = currentPoolSize; i < newThreadPoolSize; i++) {
+ ReadBufferWorker worker = new ReadBufferWorker(i, getBufferManager());
+ workerRefs.add(worker);
+ workerPool.submit(worker);
+ }
+ printTraceLog("Increased worker pool size from {} to {}",
currentPoolSize, newThreadPoolSize);
+ } else if (cpuLoad > cpuThreshold || currentPoolSize > requiredPoolSize) {
+ newThreadPoolSize = Math.max(minThreadPoolSize,
+ (int) Math.ceil((currentPoolSize * (ONE_HUNDRED -
threadPoolDownscalePercentage))/ONE_HUNDRED));
+ // Signal the extra workers to stop
+ while (workerRefs.size() > newThreadPoolSize) {
+ ReadBufferWorker worker = workerRefs.remove(workerRefs.size() - 1);
+ worker.stop();
+ }
+ printTraceLog("Decreased worker pool size from {} to {}",
currentPoolSize, newThreadPoolSize);
+ } else {
+ printTraceLog("No change in worker pool size. CPU load: {} Pool size:
{}", cpuLoad, currentPoolSize);
+ }
+ }
+
/**
- * {@inheritDoc}
+ * Similar to System.currentTimeMillis, except implemented with
System.nanoTime().
+ * System.currentTimeMillis can go backwards when system clock is changed
(e.g., with NTP time synchronization),
+ * making it unsuitable for measuring time intervals. nanotime is strictly
monotonically increasing per CPU core.
+ * Note: it is not monotonic across Sockets, and even within a CPU, its only
the
+ * more recent parts which share a clock across all cores.
+ *
+ * @return current time in milliseconds
*/
- @VisibleForTesting
- @Override
- public void callTryEvict() {
- // TODO: To be implemented
+ private long currentTimeMillis() {
+ return System.nanoTime() / 1000 / 1000;
+ }
+
+ private void purgeList(AbfsInputStream stream, LinkedList<ReadBuffer> list) {
+ for (Iterator<ReadBuffer> it = list.iterator(); it.hasNext();) {
+ ReadBuffer readBuffer = it.next();
+ if (readBuffer.getStream() == stream) {
+ it.remove();
+ // As failed ReadBuffers (bufferIndex = -1) are already pushed to free
+ // list in doneReading method, we will skip adding those here again.
+ if (readBuffer.getBufferindex() != -1) {
+ pushToFreeList(readBuffer.getBufferindex());
+ }
+ }
+ }
}
/**
- * {@inheritDoc}
+ * Test method that can clean up the current state of readAhead buffers and
+ * the lists. Will also trigger a fresh init.
*/
@VisibleForTesting
@Override
public void testResetReadBufferManager() {
- // TODO: To be implemented
+ synchronized (this) {
+ ArrayList<ReadBuffer> completedBuffers = new ArrayList<>();
+ for (ReadBuffer buf : getCompletedReadList()) {
+ if (buf != null) {
+ completedBuffers.add(buf);
+ }
+ }
+
+ for (ReadBuffer buf : completedBuffers) {
+ manualEviction(buf);
+ }
+
+ getReadAheadQueue().clear();
+ getInProgressList().clear();
+ getCompletedReadList().clear();
+ getFreeList().clear();
+ for (int i = 0; i < maxBufferPoolSize; i++) {
+ bufferPool[i] = null;
+ }
+ bufferPool = null;
+ cpuMonitorThread.shutdownNow();
+ memoryMonitorThread.shutdownNow();
+ workerPool.shutdownNow();
+ resetBufferManager();
+ }
}
- /**
- * {@inheritDoc}
- */
@VisibleForTesting
@Override
- public void testResetReadBufferManager(final int readAheadBlockSize,
- final int thresholdAgeMilliseconds) {
- // TODO: To be implemented
+ public void testResetReadBufferManager(int readAheadBlockSize, int
thresholdAgeMilliseconds) {
+ setReadAheadBlockSize(readAheadBlockSize);
+ setThresholdAgeMilliseconds(thresholdAgeMilliseconds);
+ testResetReadBufferManager();
}
- /**
- * {@inheritDoc}
- */
- @Override
- public void testMimicFullUseAndAddFailedBuffer(final ReadBuffer buf) {
- // TODO: To be implemented
+ @VisibleForTesting
+ public void callTryEvict() {
+ tryEvict();
}
- private final ThreadFactory namedThreadFactory = new ThreadFactory() {
- private int count = 0;
- @Override
- public Thread newThread(Runnable r) {
- return new Thread(r, "ReadAheadV2-Thread-" + count++);
+ @VisibleForTesting
+ public int getNumBuffers() {
+ LOCK.lock();
+ try {
+ return numberOfActiveBuffers;
+ } finally {
+ LOCK.unlock();
}
- };
+ }
@Override
void resetBufferManager() {
setBufferManager(null); // reset the singleton instance
+ setIsConfigured(false);
}
private static void setBufferManager(ReadBufferManagerV2 manager) {
bufferManager = manager;
}
+
+ private static void setIsConfigured(boolean configured) {
+ isConfigured = configured;
+ }
+
+ private final ThreadFactory workerThreadFactory = new ThreadFactory() {
+ private int count = 0;
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread t = new Thread(r, "ReadAheadV2-WorkerThread-" + count++);
+ t.setDaemon(true);
+ return t;
+ }
+ };
+
+ private void printTraceLog(String message, Object... args) {
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.trace(message, args);
+ }
+ }
+
+ private void printDebugLog(String message, Object... args) {
+ LOGGER.debug(message, args);
+ }
+
+ @VisibleForTesting
+ double getMemoryLoad() {
+ MemoryMXBean osBean = ManagementFactory.getMemoryMXBean();
+ MemoryUsage memoryUsage = osBean.getHeapMemoryUsage();
+ return (double) memoryUsage.getUsed() / memoryUsage.getMax();
+ }
+
+ @VisibleForTesting
+ public double getCpuLoad() {
+ OperatingSystemMXBean osBean = ManagementFactory.getPlatformMXBean(
+ OperatingSystemMXBean.class);
+ return osBean.getSystemCpuLoad();
Review Comment:
Nice catch
Taken
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java:
##########
@@ -87,17 +111,53 @@ static ReadBufferManagerV2 getBufferManager() {
}
/**
- * {@inheritDoc}
+ * Set the ReadBufferManagerV2 configurations based on the provided before
singleton initialization.
+ * @param readAheadBlockSize the read-ahead block size to set for the
ReadBufferManagerV2.
+ * @param abfsConfiguration the configuration to set for the
ReadBufferManagerV2.
+ */
+ public static void setReadBufferManagerConfigs(final int readAheadBlockSize,
+ final AbfsConfiguration abfsConfiguration) {
+ // Set Configs only before initializations.
+ if (bufferManager == null && !isConfigured) {
+ minThreadPoolSize = abfsConfiguration.getMinReadAheadV2ThreadPoolSize();
+ maxThreadPoolSize = abfsConfiguration.getMaxReadAheadV2ThreadPoolSize();
+ cpuMonitoringIntervalInMilliSec =
abfsConfiguration.getReadAheadV2CpuMonitoringIntervalMillis();
+ cpuThreshold =
abfsConfiguration.getReadAheadV2CpuUsageThresholdPercent()/ ONE_HUNDRED;
+ threadPoolUpscalePercentage =
abfsConfiguration.getReadAheadV2ThreadPoolUpscalePercentage();
+ threadPoolDownscalePercentage =
abfsConfiguration.getReadAheadV2ThreadPoolDownscalePercentage();
+ executorServiceKeepAliveTimeInMilliSec =
abfsConfiguration.getReadAheadExecutorServiceTTLInMillis();
+
+ minBufferPoolSize = abfsConfiguration.getMinReadAheadV2BufferPoolSize();
Review Comment:
Taken
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java:
##########
@@ -128,13 +128,20 @@ public final class FileSystemConfigurations {
public static final long
DEFAULT_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS_IN_SECONDS = 120;
public static final boolean DEFAULT_ENABLE_READAHEAD = true;
- public static final boolean DEFAULT_ENABLE_READAHEAD_V2 = false;
+ public static final boolean DEFAULT_ENABLE_READAHEAD_V2 = true;
+ public static final boolean DEFAULT_ENABLE_READAHEAD_V2_DYNAMIC_SCALING =
true;
Review Comment:
Taken
> ABFS: [ReadAheadV2] Implement Read Buffer Manager V2 with improved
> aggressiveness
> ---------------------------------------------------------------------------------
>
> Key: HADOOP-19622
> URL: https://issues.apache.org/jira/browse/HADOOP-19622
> Project: Hadoop Common
> Issue Type: Sub-task
> Components: fs/azure
> Affects Versions: 3.5.0, 3.4.1
> Reporter: Anuj Modi
> Assignee: Anuj Modi
> Priority: Major
> Labels: pull-request-available
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]