steveloughran commented on code in PR #8007:
URL: https://github.com/apache/hadoop/pull/8007#discussion_r2523839488


##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java:
##########
@@ -88,6 +101,12 @@ public void testConnectorFrameWorkIntegration() throws 
Throwable {
 
     S3AFileSystem fs =
         (S3AFileSystem) FileSystem.get(externalTestFile.toUri(), 
getConfiguration());
+
+   long fileLength = fs.getFileStatus(externalTestFile).getLen();
+
+   // Head request for the file length.
+    verifyStatisticCounterValue(fs.getIOStatistics(), AUDIT_REQUEST_EXECUTION, 
1);

Review Comment:
   get the iostats and the current value on L104; assert the execution matches 
original + 1; avoids problem we've hit elsewhere



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractAnalyticsStreamVectoredRead.java:
##########
@@ -36,9 +37,20 @@
 
 import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
 import static 
org.apache.hadoop.fs.contract.ContractTestUtils.validateVectoredReadResult;
+import static 
org.apache.hadoop.fs.s3a.Constants.ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX;
+

Review Comment:
   nit: cut this line



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java:
##########
@@ -105,17 +124,90 @@ public void testConnectorFrameWorkIntegration() throws 
Throwable {
       
Assertions.assertThat(objectInputStream.streamType()).isEqualTo(InputStreamType.Analytics);
       Assertions.assertThat(objectInputStream.getInputPolicy())
           .isEqualTo(S3AInputPolicy.Sequential);
+
+      verifyStatisticCounterValue(ioStats, STREAM_READ_BYTES, 500);
+      verifyStatisticCounterValue(ioStats, STREAM_READ_OPERATIONS, 1);
+
+      long streamBytesRead = 
objectInputStream.getS3AStreamStatistics().getBytesRead();
+      Assertions.assertThat(streamBytesRead).as("Stream statistics should 
track bytes read")
+              .isEqualTo(500);
     }
 
     verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1);
+
+    // Since policy is WHOLE_FILE, the whole file starts getting prefetched as 
soon as the stream to it is opened.
+    // So prefetched bytes is fileLen - 5
+    verifyStatisticCounterValue(ioStats, STREAM_READ_PREFETCHED_BYTES, 
fileLength - 5);
+
     fs.close();
     verifyStatisticCounterValue(fs.getIOStatistics(), 
ANALYTICS_STREAM_FACTORY_CLOSED, 1);
 
     // Expect 4 audited requests. One HEAD, and 3 GETs. The 3 GETs are because 
the read policy is WHOLE_FILE,
     // in which case, AAL will start prefetching till EoF on file open in 8MB 
chunks. The file read here
     // s3://noaa-cors-pds/raw/2023/017/ohfh/OHFH017d.23_.gz, has a size of 
~21MB, resulting in 3 GETS:
     // [0-8388607, 8388608-16777215, 16777216-21511173].
-    verifyStatisticCounterValue(fs.getIOStatistics(), AUDIT_REQUEST_EXECUTION, 
4);
+    verifyStatisticCounterValue(fs.getIOStatistics(), AUDIT_REQUEST_EXECUTION, 
5);

Review Comment:
   do the same request counting as we now have in `testMultiRowGroupParquet()`; 
less brittle



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java:
##########
@@ -193,13 +204,19 @@ public void testStreamIsNotChecksummed() throws Throwable 
{
 
       // open the stream.
       in.read();
+
       // now examine the innermost stream and make sure it doesn't have a 
checksum
-      assertStreamIsNotChecksummed(getS3AInputStream(in));
+        assertStreamIsNotChecksummed(getS3AInputStream(in));

Review Comment:
   nit, revert



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java:
##########
@@ -247,10 +270,13 @@ private void onReadFailure(IOException ioe) throws 
IOException {
   }
 
   private OpenStreamInformation 
buildOpenStreamInformation(ObjectReadParameters parameters) {
+
+    final RequestCallback requestCallback = new 
AnalyticsRequestCallback(getS3AStreamStatistics());
+
     OpenStreamInformation.OpenStreamInformationBuilder 
openStreamInformationBuilder =
         OpenStreamInformation.builder()
             .inputPolicy(mapS3AInputPolicyToAAL(parameters.getContext()
-            .getInputPolicy()));
+            .getInputPolicy())).requestCallback(requestCallback);

Review Comment:
   nit: put on a new line



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java:
##########
@@ -105,17 +124,90 @@ public void testConnectorFrameWorkIntegration() throws 
Throwable {
       
Assertions.assertThat(objectInputStream.streamType()).isEqualTo(InputStreamType.Analytics);
       Assertions.assertThat(objectInputStream.getInputPolicy())
           .isEqualTo(S3AInputPolicy.Sequential);
+
+      verifyStatisticCounterValue(ioStats, STREAM_READ_BYTES, 500);
+      verifyStatisticCounterValue(ioStats, STREAM_READ_OPERATIONS, 1);
+
+      long streamBytesRead = 
objectInputStream.getS3AStreamStatistics().getBytesRead();
+      Assertions.assertThat(streamBytesRead).as("Stream statistics should 
track bytes read")
+              .isEqualTo(500);
     }
 
     verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1);
+
+    // Since policy is WHOLE_FILE, the whole file starts getting prefetched as 
soon as the stream to it is opened.
+    // So prefetched bytes is fileLen - 5
+    verifyStatisticCounterValue(ioStats, STREAM_READ_PREFETCHED_BYTES, 
fileLength - 5);
+
     fs.close();
     verifyStatisticCounterValue(fs.getIOStatistics(), 
ANALYTICS_STREAM_FACTORY_CLOSED, 1);
 
     // Expect 4 audited requests. One HEAD, and 3 GETs. The 3 GETs are because 
the read policy is WHOLE_FILE,
     // in which case, AAL will start prefetching till EoF on file open in 8MB 
chunks. The file read here
     // s3://noaa-cors-pds/raw/2023/017/ohfh/OHFH017d.23_.gz, has a size of 
~21MB, resulting in 3 GETS:
     // [0-8388607, 8388608-16777215, 16777216-21511173].
-    verifyStatisticCounterValue(fs.getIOStatistics(), AUDIT_REQUEST_EXECUTION, 
4);
+    verifyStatisticCounterValue(fs.getIOStatistics(), AUDIT_REQUEST_EXECUTION, 
5);
+  }
+
+  @Test
+  public void testSequentialPrefetching() throws IOException {
+
+    Configuration conf = getConfiguration();
+
+    // AAL uses a caffeine cache, and expires any prefetched data for a key 1s 
after it was last accessed by default.
+    // While this works well when running on EC2, for local testing, it can 
take more than 1s to download large chunks
+    // of data. Set this value to higher for testing to prevent early cache 
evictions.
+    conf.setInt(ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX +
+            "."  + AAL_CACHE_TIMEOUT, 10000);
+
+    S3AFileSystem fs =
+            (S3AFileSystem) FileSystem.get(externalTestFile.toUri(), 
getConfiguration());
+    byte[] buffer = new byte[10 * ONE_MB];
+    IOStatistics ioStats;
+
+    long fileLength = fs.getFileStatus(externalTestFile).getLen();
+
+    // Here we read through the 21MB external test file, but do not pass in 
the WHOLE_FILE policy. Instead, we rely
+    // on AAL detecting a sequential pattern being read, and then prefetching 
bytes in a geometrical progression.
+    // AAL's sequential prefetching starts prefetching in increments 4MB, 8MB, 
16MB etc. depending on how many
+    // sequential reads happen.
+    try (FSDataInputStream inputStream = fs.open(externalTestFile)) {
+      ioStats = inputStream.getIOStatistics();
+
+      inputStream.readFully(buffer, 0, ONE_MB);
+      // The first sequential read, so prefetch the next 4MB.
+      inputStream.readFully(buffer,   0, ONE_MB);
+
+      // Since ONE_MB was requested by the reader, the prefetched bytes are 
3MB.
+      verifyStatisticCounterValue(ioStats, STREAM_READ_PREFETCHED_BYTES, 3 * 
ONE_MB);
+
+      // These next two reads are within the last prefetched bytes, so no 
further bytes are prefetched.
+      inputStream.readFully(buffer, 0, 2 *  ONE_MB);
+      inputStream.readFully(buffer, 0, ONE_MB);
+      verifyStatisticCounterValue(ioStats, STREAM_READ_PREFETCHED_BYTES, 3 * 
ONE_MB);
+      // Two cache hits, as the previous two reads were already prefetched.
+      verifyStatisticCounterValue(ioStats, STREAM_READ_CACHE_HIT, 2);
+
+      // Another sequential read, GP will now prefetch the next 8MB of data.
+      inputStream.readFully(buffer, 0, ONE_MB);
+      // Cache hit is still 2, as the previous read required a new GET request 
as it was outside the previously fetched
+      // 4MB.
+      verifyStatisticCounterValue(ioStats, STREAM_READ_CACHE_HIT, 2);
+      // A total of 10MB is prefetched - 3MB and then 7MB.
+      verifyStatisticCounterValue(ioStats, STREAM_READ_PREFETCHED_BYTES, 10 * 
ONE_MB);

Review Comment:
   so we are reading 10 MB of data. I wonder if we should consider this a scale 
test?
   
   I'd say no as
   * it's reading, not writing
   * everyone's networks should be faster in the decade+ since the Huge tests 
were first written
   * having scale off means it gets run more often



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractAnalyticsStreamVectoredRead.java:
##########
@@ -64,6 +67,20 @@ public ITestS3AContractAnalyticsStreamVectoredRead(String 
bufferType) {
   @Override
   protected Configuration createConfiguration() {
     Configuration conf = super.createConfiguration();
+    // Set the coalesce tolerance to 1KB, default is 1MB.
+    conf.setInt(ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX +

Review Comment:
   still need those ` removeBaseAndBufferOverrides()` calls
   + disable fs caching



##########
hadoop-project/pom.xml:
##########
@@ -211,7 +211,7 @@
     <aws-java-sdk.version>1.12.720</aws-java-sdk.version>
     <aws-java-sdk-v2.version>2.35.4</aws-java-sdk-v2.version>
     
<amazon-s3-encryption-client-java.version>3.1.1</amazon-s3-encryption-client-java.version>
-    
<amazon-s3-analyticsaccelerator-s3.version>1.3.0</amazon-s3-analyticsaccelerator-s3.version>
+    
<amazon-s3-analyticsaccelerator-s3.version>1.3.1</amazon-s3-analyticsaccelerator-s3.version>

Review Comment:
   split this into its own patch and update LICENSE-binary in it too



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java:
##########
@@ -439,18 +455,27 @@ public void testVectorReadPastEOF() throws Throwable {
         byte[] buf = new byte[longLen];
         ByteBuffer bb = ByteBuffer.wrap(buf);
         final FileRange range = FileRange.createFileRange(0, longLen);
-        in.readVectored(Arrays.asList(range), (i) -> bb);
-        interceptFuture(EOFException.class,
-            "",
-            ContractTestUtils.VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS,
-            TimeUnit.SECONDS,
-            range.getData());
-        assertS3StreamClosed(in);
-        return "vector read past EOF with " + in;
+
+        // For AAL, if there is no eTag, the provided length will not be 
passed in, and a HEAD request will be made.
+        // AAL requires the etag to detect changes in the object and then do 
cache eviction if required.
+        if (isAnalyticsStream()) {
+          intercept(EOFException.class, () -> 
in.readVectored(Arrays.asList(range), (i) -> bb));

Review Comment:
   nit, split in.readVectored() onto the next line



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java:
##########
@@ -261,10 +277,12 @@ public void testOpenFileLongerLengthReadFully() throws 
Throwable {
       }
     },
         always(),
-        // two GET calls were made, one for readFully,
-        // the second on the read() past the EOF
+         // two GET calls were made, one for readFully,

Review Comment:
   nit, revert



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to