monthonk commented on code in PR #4458:
URL: https://github.com/apache/hadoop/pull/4458#discussion_r902344956


##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/CachingBlockManager.java:
##########
@@ -92,9 +100,11 @@ public CachingBlockManager(
     this.numCachingErrors = new AtomicInteger();
     this.numReadErrors = new AtomicInteger();
     this.cachingDisabled = new AtomicBoolean();
+    this.prefetchingStatistics = requireNonNull(prefetchingStatistics);

Review Comment:
   We use `Validate.checkNotNull` for futurePool why use other function for 
prefetchingStatistics? also add exception info the comment



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/SingleFilePerBlockCache.java:
##########
@@ -81,7 +85,13 @@ public String toString() {
     }
   }
 
-  public SingleFilePerBlockCache() {
+  /**
+   * Constructs an instance of a {@code SingleFilePerBlockCache}.
+   *
+   * @param prefetchingStatistics statistics for this stream.
+   */
+  public SingleFilePerBlockCache(PrefetchingStatistics prefetchingStatistics) {
+    this.prefetchingStatistics = requireNonNull(prefetchingStatistics);

Review Comment:
   see if we want to change to `Validate.checkNotNull` and add possible 
exception info



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/BufferPool.java:
##########
@@ -56,26 +58,32 @@ public class BufferPool implements Closeable {
   // Allows associating metadata to each buffer in the pool.
   private Map<BufferData, ByteBuffer> allocated;
 
+  private PrefetchingStatistics prefetchingStatistics;
+
   /**
    * Initializes a new instance of the {@code BufferPool} class.
    *
    * @param size number of buffer in this pool.
    * @param bufferSize size in bytes of each buffer.
+   * @param prefetchingStatistics statistics for this stream.
    *
    * @throws IllegalArgumentException if size is zero or negative.
    * @throws IllegalArgumentException if bufferSize is zero or negative.
    */
-  public BufferPool(int size, int bufferSize) {
+  public BufferPool(int size, int bufferSize, PrefetchingStatistics 
prefetchingStatistics) {
     Validate.checkPositiveInteger(size, "size");
     Validate.checkPositiveInteger(bufferSize, "bufferSize");
 
     this.size = size;
     this.bufferSize = bufferSize;
     this.allocated = new IdentityHashMap<BufferData, ByteBuffer>();
+    this.prefetchingStatistics = requireNonNull(prefetchingStatistics);

Review Comment:
   what exception would be thrown if it is null? and please add it in the 
comment



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java:
##########
@@ -1281,6 +1313,38 @@ public DurationTracker initiateInnerStreamClose(final 
boolean abort) {
           ? StreamStatisticNames.STREAM_READ_REMOTE_STREAM_ABORTED
           : StreamStatisticNames.STREAM_READ_REMOTE_STREAM_DRAINED);
     }
+
+    @Override
+    public void prefetchOperationStarted() {
+      incAllGauges(STREAM_READ_ACTIVE_PREFETCH_OPERATIONS, 1);
+      prefetchReadOperations.incrementAndGet();
+    }
+
+    @Override
+    public void blockAddedToFileCache() {
+      incAllGauges(STREAM_READ_BLOCKS_IN_FILE_CACHE, 1);
+    }
+
+    @Override
+    public void blockRemovedFromFileCache() {
+      incAllGauges(STREAM_READ_BLOCKS_IN_FILE_CACHE, -1);
+    }
+
+    @Override
+    public void prefetchOperationCompleted() {
+      incAllGauges(STREAM_READ_ACTIVE_PREFETCH_OPERATIONS, -1);
+    }
+
+

Review Comment:
   nit: delete extra new line



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java:
##########
@@ -208,6 +208,43 @@ public void unbuffered() {
 
     }
 
+
+    @Override
+    public void prefetchOperationStarted() {
+
+    }
+
+

Review Comment:
   nit: delete extra new line



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3CachingInputStream.java:
##########
@@ -117,8 +120,8 @@ public void seek(long pos) throws IOException {
 
   @Override
   public void close() throws IOException {
-    super.close();
     this.blockManager.close();
+    super.close();

Review Comment:
   why move it here? please add some comments if it is something others should 
be aware of.



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java:
##########
@@ -674,6 +674,46 @@ private Constants() {
   public static final String STREAM_READ_GAUGE_INPUT_POLICY =
       "stream_read_gauge_input_policy";
 
+  /**
+   * Total number of prefetching operations executed.
+   */
+  public static final String STREAM_READ_PREFETCH_OPERATIONS

Review Comment:
   I think they should go to common package since `PrefetchingStatistics` is 
already there. maybe create a new class for these constants.



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java:
##########
@@ -208,6 +208,43 @@ public void unbuffered() {
 
     }
 
+

Review Comment:
   nit: delete extra new line



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to