[
https://issues.apache.org/jira/browse/HADOOP-19613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18008002#comment-18008002
]
ASF GitHub Bot commented on HADOOP-19613:
-----------------------------------------
anmolanmol1234 commented on code in PR #7801:
URL: https://github.com/apache/hadoop/pull/7801#discussion_r2215380532
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV1.java:
##########
@@ -0,0 +1,637 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.azurebfs.services;
+
+import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+import java.util.Stack;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
+import org.apache.hadoop.classification.VisibleForTesting;
+
+/**
+ * The Read Buffer Manager for Rest AbfsClient.
+ * V1 implementation of ReadBufferManager.
+ */
+final class ReadBufferManagerV1 implements ReadBufferManager {
+ private static final Logger LOGGER = LoggerFactory.getLogger(
+ ReadBufferManagerV1.class);
+ private static final int ONE_KB = 1024;
+ private static final int ONE_MB = ONE_KB * ONE_KB;
+
+ private static final int NUM_BUFFERS = 16;
+ private static final int NUM_THREADS = 8;
+ private static final int DEFAULT_THRESHOLD_AGE_MILLISECONDS = 3000; // have
to see if 3 seconds is a good threshold
+
+ private static int blockSize = 4 * ONE_MB;
+ private static int thresholdAgeMilliseconds =
DEFAULT_THRESHOLD_AGE_MILLISECONDS;
+ private Thread[] threads = new Thread[NUM_THREADS];
+ private byte[][] buffers; // array of byte[] buffers, to hold the data
that is read
+ private Stack<Integer> freeList = new Stack<>(); // indices in buffers[]
array that are available
+
+ private Queue<ReadBuffer> readAheadQueue = new LinkedList<>(); // queue of
requests that are not picked up by any worker thread yet
+ private LinkedList<ReadBuffer> inProgressList = new LinkedList<>(); //
requests being processed by worker threads
+ private LinkedList<ReadBuffer> completedReadList = new LinkedList<>(); //
buffers available for reading
+ private static ReadBufferManagerV1 bufferManager; // singleton, initialized
in static initialization block
+ private static final ReentrantLock LOCK = new ReentrantLock();
+
+ static ReadBufferManagerV1 getBufferManager() {
+ if (bufferManager == null) {
+ LOCK.lock();
+ try {
+ if (bufferManager == null) {
+ bufferManager = new ReadBufferManagerV1();
+ bufferManager.init();
+ }
+ } finally {
+ LOCK.unlock();
+ }
+ }
+ return bufferManager;
+ }
+
+ static void setReadBufferManagerConfigs(int readAheadBlockSize) {
+ if (bufferManager == null) {
+ LOGGER.debug(
+ "ReadBufferManagerV1 not initialized yet. Overriding
readAheadBlockSize as {}",
+ readAheadBlockSize);
+ blockSize = readAheadBlockSize;
+ }
+ }
+
+ private void init() {
+ buffers = new byte[NUM_BUFFERS][];
+ for (int i = 0; i < NUM_BUFFERS; i++) {
+ buffers[i] = new byte[blockSize]; // same buffers are reused. The byte
array never goes back to GC
+ freeList.add(i);
+ }
+ for (int i = 0; i < NUM_THREADS; i++) {
+ Thread t = new Thread(new ReadBufferWorker(i));
+ t.setDaemon(true);
+ threads[i] = t;
+ t.setName("ABFS-prefetch-" + i);
+ t.start();
+ }
+ ReadBufferWorker.UNLEASH_WORKERS.countDown();
+ }
+
+ // hide instance constructor
+ private ReadBufferManagerV1() {
+ LOGGER.trace("Creating readbuffer manager with HADOOP-18546 patch");
+ }
+
+ /**
+ * {@link AbfsInputStream} calls this method to queue read-aheads.
+ *
+ * @param stream The {@link AbfsInputStream} for which to do the
read-ahead
+ * @param requestedOffset The offset in the file which shoukd be read
+ * @param requestedLength The length to read
Review Comment:
tracing context missing in params
> ABFS: [ReadAheadV2] Refactor ReadBufferManager to isolate new code with the
> current working code
> ------------------------------------------------------------------------------------------------
>
> Key: HADOOP-19613
> URL: https://issues.apache.org/jira/browse/HADOOP-19613
> Project: Hadoop Common
> Issue Type: Sub-task
> Components: fs/azure
> Affects Versions: 3.5.0, 3.4.1
> Reporter: Anuj Modi
> Assignee: Anuj Modi
> Priority: Major
> Labels: pull-request-available
>
> Read Buffer Manager used today was introduced way back and has been stable
> for quite a while.
> Read Buffer Manager to be introduced as part of
> https://issues.apache.org/jira/browse/HADOOP-19596 will introduce many
> changes incrementally over time. While the development goes on and we are
> able to fully stabilise the optimized version we need the current flow to be
> functional and undisturbed.
> This work item is to isolate that from new code by refactoring
> ReadBufferManager class to have 2 different implementations with same public
> interfaces: ReadBufferManagerV1 and ReadBufferManagerV2.
> This will also introduce new configs that can be used to toggle between new
> and old code.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]