[ 
https://issues.apache.org/jira/browse/HADOOP-19364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18038067#comment-18038067
 ] 

ASF GitHub Bot commented on HADOOP-19364:
-----------------------------------------

steveloughran commented on code in PR #8007:
URL: https://github.com/apache/hadoop/pull/8007#discussion_r2523839488


##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java:
##########
@@ -88,6 +101,12 @@ public void testConnectorFrameWorkIntegration() throws 
Throwable {
 
     S3AFileSystem fs =
         (S3AFileSystem) FileSystem.get(externalTestFile.toUri(), 
getConfiguration());
+
+   long fileLength = fs.getFileStatus(externalTestFile).getLen();
+
+   // Head request for the file length.
+    verifyStatisticCounterValue(fs.getIOStatistics(), AUDIT_REQUEST_EXECUTION, 
1);

Review Comment:
   get the iostats and the current value on L104; assert the execution matches 
original + 1; avoids problem we've hit elsewhere



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractAnalyticsStreamVectoredRead.java:
##########
@@ -36,9 +37,20 @@
 
 import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
 import static 
org.apache.hadoop.fs.contract.ContractTestUtils.validateVectoredReadResult;
+import static 
org.apache.hadoop.fs.s3a.Constants.ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX;
+

Review Comment:
   nit: cut this line



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java:
##########
@@ -105,17 +124,90 @@ public void testConnectorFrameWorkIntegration() throws 
Throwable {
       
Assertions.assertThat(objectInputStream.streamType()).isEqualTo(InputStreamType.Analytics);
       Assertions.assertThat(objectInputStream.getInputPolicy())
           .isEqualTo(S3AInputPolicy.Sequential);
+
+      verifyStatisticCounterValue(ioStats, STREAM_READ_BYTES, 500);
+      verifyStatisticCounterValue(ioStats, STREAM_READ_OPERATIONS, 1);
+
+      long streamBytesRead = 
objectInputStream.getS3AStreamStatistics().getBytesRead();
+      Assertions.assertThat(streamBytesRead).as("Stream statistics should 
track bytes read")
+              .isEqualTo(500);
     }
 
     verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1);
+
+    // Since policy is WHOLE_FILE, the whole file starts getting prefetched as 
soon as the stream to it is opened.
+    // So prefetched bytes is fileLen - 5
+    verifyStatisticCounterValue(ioStats, STREAM_READ_PREFETCHED_BYTES, 
fileLength - 5);
+
     fs.close();
     verifyStatisticCounterValue(fs.getIOStatistics(), 
ANALYTICS_STREAM_FACTORY_CLOSED, 1);
 
     // Expect 4 audited requests. One HEAD, and 3 GETs. The 3 GETs are because 
the read policy is WHOLE_FILE,
     // in which case, AAL will start prefetching till EoF on file open in 8MB 
chunks. The file read here
     // s3://noaa-cors-pds/raw/2023/017/ohfh/OHFH017d.23_.gz, has a size of 
~21MB, resulting in 3 GETS:
     // [0-8388607, 8388608-16777215, 16777216-21511173].
-    verifyStatisticCounterValue(fs.getIOStatistics(), AUDIT_REQUEST_EXECUTION, 
4);
+    verifyStatisticCounterValue(fs.getIOStatistics(), AUDIT_REQUEST_EXECUTION, 
5);

Review Comment:
   do the same request counting as we now have in `testMultiRowGroupParquet()`; 
less brittle



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java:
##########
@@ -193,13 +204,19 @@ public void testStreamIsNotChecksummed() throws Throwable 
{
 
       // open the stream.
       in.read();
+
       // now examine the innermost stream and make sure it doesn't have a 
checksum
-      assertStreamIsNotChecksummed(getS3AInputStream(in));
+        assertStreamIsNotChecksummed(getS3AInputStream(in));

Review Comment:
   nit, revert



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java:
##########
@@ -247,10 +270,13 @@ private void onReadFailure(IOException ioe) throws 
IOException {
   }
 
   private OpenStreamInformation 
buildOpenStreamInformation(ObjectReadParameters parameters) {
+
+    final RequestCallback requestCallback = new 
AnalyticsRequestCallback(getS3AStreamStatistics());
+
     OpenStreamInformation.OpenStreamInformationBuilder 
openStreamInformationBuilder =
         OpenStreamInformation.builder()
             .inputPolicy(mapS3AInputPolicyToAAL(parameters.getContext()
-            .getInputPolicy()));
+            .getInputPolicy())).requestCallback(requestCallback);

Review Comment:
   nit: put on a new line



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java:
##########
@@ -105,17 +124,90 @@ public void testConnectorFrameWorkIntegration() throws 
Throwable {
       
Assertions.assertThat(objectInputStream.streamType()).isEqualTo(InputStreamType.Analytics);
       Assertions.assertThat(objectInputStream.getInputPolicy())
           .isEqualTo(S3AInputPolicy.Sequential);
+
+      verifyStatisticCounterValue(ioStats, STREAM_READ_BYTES, 500);
+      verifyStatisticCounterValue(ioStats, STREAM_READ_OPERATIONS, 1);
+
+      long streamBytesRead = 
objectInputStream.getS3AStreamStatistics().getBytesRead();
+      Assertions.assertThat(streamBytesRead).as("Stream statistics should 
track bytes read")
+              .isEqualTo(500);
     }
 
     verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1);
+
+    // Since policy is WHOLE_FILE, the whole file starts getting prefetched as 
soon as the stream to it is opened.
+    // So prefetched bytes is fileLen - 5
+    verifyStatisticCounterValue(ioStats, STREAM_READ_PREFETCHED_BYTES, 
fileLength - 5);
+
     fs.close();
     verifyStatisticCounterValue(fs.getIOStatistics(), 
ANALYTICS_STREAM_FACTORY_CLOSED, 1);
 
     // Expect 4 audited requests. One HEAD, and 3 GETs. The 3 GETs are because 
the read policy is WHOLE_FILE,
     // in which case, AAL will start prefetching till EoF on file open in 8MB 
chunks. The file read here
     // s3://noaa-cors-pds/raw/2023/017/ohfh/OHFH017d.23_.gz, has a size of 
~21MB, resulting in 3 GETS:
     // [0-8388607, 8388608-16777215, 16777216-21511173].
-    verifyStatisticCounterValue(fs.getIOStatistics(), AUDIT_REQUEST_EXECUTION, 
4);
+    verifyStatisticCounterValue(fs.getIOStatistics(), AUDIT_REQUEST_EXECUTION, 
5);
+  }
+
+  @Test
+  public void testSequentialPrefetching() throws IOException {
+
+    Configuration conf = getConfiguration();
+
+    // AAL uses a caffeine cache, and expires any prefetched data for a key 1s 
after it was last accessed by default.
+    // While this works well when running on EC2, for local testing, it can 
take more than 1s to download large chunks
+    // of data. Set this value to higher for testing to prevent early cache 
evictions.
+    conf.setInt(ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX +
+            "."  + AAL_CACHE_TIMEOUT, 10000);
+
+    S3AFileSystem fs =
+            (S3AFileSystem) FileSystem.get(externalTestFile.toUri(), 
getConfiguration());
+    byte[] buffer = new byte[10 * ONE_MB];
+    IOStatistics ioStats;
+
+    long fileLength = fs.getFileStatus(externalTestFile).getLen();
+
+    // Here we read through the 21MB external test file, but do not pass in 
the WHOLE_FILE policy. Instead, we rely
+    // on AAL detecting a sequential pattern being read, and then prefetching 
bytes in a geometrical progression.
+    // AAL's sequential prefetching starts prefetching in increments 4MB, 8MB, 
16MB etc. depending on how many
+    // sequential reads happen.
+    try (FSDataInputStream inputStream = fs.open(externalTestFile)) {
+      ioStats = inputStream.getIOStatistics();
+
+      inputStream.readFully(buffer, 0, ONE_MB);
+      // The first sequential read, so prefetch the next 4MB.
+      inputStream.readFully(buffer,   0, ONE_MB);
+
+      // Since ONE_MB was requested by the reader, the prefetched bytes are 
3MB.
+      verifyStatisticCounterValue(ioStats, STREAM_READ_PREFETCHED_BYTES, 3 * 
ONE_MB);
+
+      // These next two reads are within the last prefetched bytes, so no 
further bytes are prefetched.
+      inputStream.readFully(buffer, 0, 2 *  ONE_MB);
+      inputStream.readFully(buffer, 0, ONE_MB);
+      verifyStatisticCounterValue(ioStats, STREAM_READ_PREFETCHED_BYTES, 3 * 
ONE_MB);
+      // Two cache hits, as the previous two reads were already prefetched.
+      verifyStatisticCounterValue(ioStats, STREAM_READ_CACHE_HIT, 2);
+
+      // Another sequential read, GP will now prefetch the next 8MB of data.
+      inputStream.readFully(buffer, 0, ONE_MB);
+      // Cache hit is still 2, as the previous read required a new GET request 
as it was outside the previously fetched
+      // 4MB.
+      verifyStatisticCounterValue(ioStats, STREAM_READ_CACHE_HIT, 2);
+      // A total of 10MB is prefetched - 3MB and then 7MB.
+      verifyStatisticCounterValue(ioStats, STREAM_READ_PREFETCHED_BYTES, 10 * 
ONE_MB);

Review Comment:
   so we are reading 10 MB of data. I wonder if we should consider this a scale 
test?
   
   I'd say no as
   * it's reading, not writing
   * everyone's networks should be faster in the decade+ since the Huge tests 
were first written
   * having scale off means it gets run more often



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractAnalyticsStreamVectoredRead.java:
##########
@@ -64,6 +67,20 @@ public ITestS3AContractAnalyticsStreamVectoredRead(String 
bufferType) {
   @Override
   protected Configuration createConfiguration() {
     Configuration conf = super.createConfiguration();
+    // Set the coalesce tolerance to 1KB, default is 1MB.
+    conf.setInt(ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX +

Review Comment:
   still need those ` removeBaseAndBufferOverrides()` calls
   + disable fs caching



##########
hadoop-project/pom.xml:
##########
@@ -211,7 +211,7 @@
     <aws-java-sdk.version>1.12.720</aws-java-sdk.version>
     <aws-java-sdk-v2.version>2.35.4</aws-java-sdk-v2.version>
     
<amazon-s3-encryption-client-java.version>3.1.1</amazon-s3-encryption-client-java.version>
-    
<amazon-s3-analyticsaccelerator-s3.version>1.3.0</amazon-s3-analyticsaccelerator-s3.version>
+    
<amazon-s3-analyticsaccelerator-s3.version>1.3.1</amazon-s3-analyticsaccelerator-s3.version>

Review Comment:
   split this into its own patch and update LICENSE-binary in it too



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java:
##########
@@ -439,18 +455,27 @@ public void testVectorReadPastEOF() throws Throwable {
         byte[] buf = new byte[longLen];
         ByteBuffer bb = ByteBuffer.wrap(buf);
         final FileRange range = FileRange.createFileRange(0, longLen);
-        in.readVectored(Arrays.asList(range), (i) -> bb);
-        interceptFuture(EOFException.class,
-            "",
-            ContractTestUtils.VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS,
-            TimeUnit.SECONDS,
-            range.getData());
-        assertS3StreamClosed(in);
-        return "vector read past EOF with " + in;
+
+        // For AAL, if there is no eTag, the provided length will not be 
passed in, and a HEAD request will be made.
+        // AAL requires the etag to detect changes in the object and then do 
cache eviction if required.
+        if (isAnalyticsStream()) {
+          intercept(EOFException.class, () -> 
in.readVectored(Arrays.asList(range), (i) -> bb));

Review Comment:
   nit, split in.readVectored() onto the next line



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java:
##########
@@ -261,10 +277,12 @@ public void testOpenFileLongerLengthReadFully() throws 
Throwable {
       }
     },
         always(),
-        // two GET calls were made, one for readFully,
-        // the second on the read() past the EOF
+         // two GET calls were made, one for readFully,

Review Comment:
   nit, revert





> S3A Analytics-Accelerator: Add IoStatistics support
> ---------------------------------------------------
>
>                 Key: HADOOP-19364
>                 URL: https://issues.apache.org/jira/browse/HADOOP-19364
>             Project: Hadoop Common
>          Issue Type: Sub-task
>          Components: fs/s3
>            Reporter: Ahmar Suhail
>            Priority: Major
>              Labels: pull-request-available
>
> S3A provides InputStream statistics: 
> [https://github.com/apache/hadoop/blob/trunk/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/S3AInputStreamStatistics.java]
> This helps track things like how many bytes were read from a stream etc. 
>  
> The current integration does not currently implement statistics. To start off 
> with we should identify which of these statistics makes sense for us track in 
> the new stream. Some examples are:
>  
> 1/ bytesRead
> 2/ readOperationStarted
> 3/ initiateGetRequest
>  
> Some of these (1 and 2) are more straightforward, and should not require any 
> changes to analytics-accelerator-s3, but tracking GET requests will require 
> this. 
> We should also add tests that make assertions on these statistics. See 
> ITestS3APrefetchingInputStream for an example to do this. 
> And see https://issues.apache.org/jira/browse/HADOOP-18190 for how this was 
> done on the prefetching stream, and PR: 
> https://github.com/apache/hadoop/pull/4458



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to