[
https://issues.apache.org/jira/browse/HADOOP-18184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17750807#comment-17750807
]
ASF GitHub Bot commented on HADOOP-18184:
-----------------------------------------
ahmarsuhail commented on code in PR #5832:
URL: https://github.com/apache/hadoop/pull/5832#discussion_r1283192650
##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/CachingBlockManager.java:
##########
@@ -392,16 +413,41 @@ private void readBlock(BufferData data, boolean
isPrefetch, BufferData.State...
ops.end(op);
}
- if (isPrefetch) {
- prefetchingStatistics.prefetchOperationCompleted();
- if (tracker != null) {
- tracker.close();
- }
+ // update the statistics
+ prefetchingStatistics.fetchOperationCompleted(isPrefetch,
bytesFetched);
+ if (tracker != null) {
Review Comment:
potentially could remove this null check and the one on 404 for the tracker.
it used to be null for non prefetching ops before..but won't be null anymore
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java:
##########
@@ -577,11 +614,32 @@ public static Configuration
prepareTestConfiguration(final Configuration conf) {
boolean prefetchEnabled =
getTestPropertyBool(conf, PREFETCH_ENABLED_KEY,
PREFETCH_ENABLED_DEFAULT);
- conf.setBoolean(PREFETCH_ENABLED_KEY, prefetchEnabled);
+ enablePrefetch(conf, prefetchEnabled);
return conf;
}
+ /**
+ * Unset base/bucket prefetch options and set to the supplied option instead.
+ * @param conf configuration
+ * @param prefetch prefetch option
+ * @return the modified configuration.
+ */
+ public static Configuration enablePrefetch(final Configuration conf, boolean
prefetch) {
Review Comment:
nit: renaming to `setPrefetchState` or something would improve readability.
`enablePrefetch` on a glance, makes it seem like we're always enabling it
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/ITestS3APrefetchingCacheFiles.java:
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.prefetch;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.assertj.core.api.Assertions;
+import org.assertj.core.api.Assumptions;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.S3ATestUtils;
+import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest;
+import org.apache.hadoop.fs.statistics.IOStatisticsContext;
+
+import static org.apache.hadoop.fs.s3a.Constants.BUFFER_DIR;
+import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_DEFAULT_SIZE;
+import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_SIZE_KEY;
+import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.enablePrefetch;
+import static
org.apache.hadoop.fs.statistics.IOStatisticsContext.getCurrentIOStatisticsContext;
+import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
+import static org.apache.hadoop.test.Sizes.S_1K;
+
+/**
+ * Test the cache file behaviour with prefetching input stream.
+ */
+public class ITestS3APrefetchingCacheFiles extends AbstractS3ACostTest {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ITestS3APrefetchingCacheFiles.class);
+ private static final int BLOCK_SIZE = S_1K * 10;
+
+ private Path testFile;
+ private S3AFileSystem testFileSystem;
+ private int prefetchBlockSize;
+ private Configuration conf;
+
+ /**
+ * Thread level IOStatistics; reset in setup().
+ */
+ private IOStatisticsContext ioStatisticsContext;
+
+ private File tmpFileDir;
+
+
+ @Before
+ public void setUp() throws Exception {
+ super.setup();
+ conf = createConfiguration();
+ tmpFileDir = File.createTempFile("ITestS3APrefetchingCacheFiles", "");
+ tmpFileDir.delete();
+ tmpFileDir.mkdirs();
+
+ conf.set(BUFFER_DIR, tmpFileDir.getAbsolutePath());
+ String testFileUri = S3ATestUtils.getCSVTestFile(conf);
+
+ testFile = new Path(testFileUri);
+ testFileSystem = (S3AFileSystem) FileSystem.get(testFile.toUri(), conf);
+
+ prefetchBlockSize = conf.getInt(PREFETCH_BLOCK_SIZE_KEY,
PREFETCH_BLOCK_DEFAULT_SIZE);
+ final FileStatus testFileStatus = testFileSystem.getFileStatus(testFile);
+ Assumptions.assumeThat(testFileStatus.getLen())
+ .describedAs("Test file %s is smaller than required size %d",
+ testFileStatus, prefetchBlockSize * 4)
+ .isGreaterThan(prefetchBlockSize);
+
+ ioStatisticsContext = getCurrentIOStatisticsContext();
+ ioStatisticsContext.reset();
+ }
+
+ @Override
+ public Configuration createConfiguration() {
+ Configuration conf = super.createConfiguration();
+ enablePrefetch(conf, true);
+ disableFilesystemCaching(conf);
+ S3ATestUtils.removeBaseAndBucketOverrides(conf, PREFETCH_BLOCK_SIZE_KEY);
+ conf.setInt(PREFETCH_BLOCK_SIZE_KEY, BLOCK_SIZE);
+ return conf;
+ }
+
+ @Override
+ public synchronized void teardown() throws Exception {
+ super.teardown();
+ tmpFileDir.delete();
+ File[] tmpFiles = tmpFileDir.listFiles();
+ if (tmpFiles != null) {
+ for (File filePath : tmpFiles) {
+ String path = filePath.getPath();
+ filePath.delete();
+ }
+ }
+ cleanupWithLogger(LOG, testFileSystem);
+ }
+
+ /**
+ * Test to verify the existence of the cache file.
+ * Tries to perform inputStream read and seek ops to make the prefetching
take place and
+ * asserts whether file with .bin suffix is present. It also verifies
certain file stats.
+ */
+ @Test
+ public void testCacheFileExistence() throws Throwable {
+ describe("Verify that FS cache files exist on local FS");
+ skipIfClientSideEncryption();
+
+ try (FSDataInputStream in = testFileSystem.open(testFile)) {
+ byte[] buffer = new byte[prefetchBlockSize];
+
+ in.read(buffer, 0, prefetchBlockSize - 10240);
+ assertCacheFileExists();
Review Comment:
shouldn't we add a seek and then a read here? Though I tried that locally
and the test still fails
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/ITestInMemoryInputStream.java:
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.prefetch;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.fs.s3a.S3ATestUtils;
+import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest;
+import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.statistics.IOStatisticsContext;
+
+import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.enablePrefetch;
+import static
org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticMaximum;
+import static
org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
+import static
org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticGaugeValue;
+import static
org.apache.hadoop.fs.statistics.IOStatisticsContext.getCurrentIOStatisticsContext;
+import static
org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString;
+import static
org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_EXECUTOR_ACQUIRED;
+import static
org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_HTTP_GET_REQUEST;
+import static org.apache.hadoop.fs.statistics.StoreStatisticNames.SUFFIX_MAX;
+import static
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_ACTIVE_MEMORY_IN_USE;
+import static
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BLOCK_CACHE_ENABLED;
+import static
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BLOCK_PREFETCH_ENABLED;
+import static
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BYTES;
+import static
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_FULLY_OPERATIONS;
+import static
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_OPENED;
+import static
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_PREFETCH_OPERATIONS;
+import static org.apache.hadoop.test.Sizes.S_16K;
+import static org.apache.hadoop.test.Sizes.S_1K;
+import static org.apache.hadoop.test.Sizes.S_4K;
+
+/**
+ * Test the prefetching input stream, validates that the
+ * S3AInMemoryInputStream is working as expected.
+ */
+public class ITestInMemoryInputStream extends AbstractS3ACostTest {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ITestInMemoryInputStream.class);
+
+ // Size should be < block size so S3AInMemoryInputStream is used
+ private static final int SMALL_FILE_SIZE = S_16K;
+
+ /**
+ * Thread level IOStatistics; reset in setup().
+ */
+ private IOStatisticsContext ioStatisticsContext;
+
+ @Override
+ public Configuration createConfiguration() {
+ Configuration conf = super.createConfiguration();
+ S3ATestUtils.removeBaseAndBucketOverrides(conf, PREFETCH_ENABLED_KEY);
+ enablePrefetch(conf, true);
+ return conf;
+ }
+
+ @Override
+ public void setup() throws Exception {
+ super.setup();
+ ioStatisticsContext = getCurrentIOStatisticsContext();
+ ioStatisticsContext.reset();
+ }
+
+ private void printStreamStatistics(final FSDataInputStream in) {
+ LOG.info("Stream statistics\n{}",
+ ioStatisticsToPrettyString(in.getIOStatistics()));
+ }
+
+ @Test
+ public void testRandomReadSmallFile() throws Throwable {
+ describe("random read on a small file, uses S3AInMemoryInputStream");
+
+ byte[] data = ContractTestUtils.dataset(SMALL_FILE_SIZE, 'a', 26);
+ Path smallFile = path("randomReadSmallFile");
+ ContractTestUtils.writeDataset(getFileSystem(), smallFile, data,
data.length, 16, true);
+
+ int expectedReadBytes = 0;
+ try (FSDataInputStream in = getFileSystem().open(smallFile)) {
+ IOStatistics ioStats = in.getIOStatistics();
+
+ byte[] buffer = new byte[SMALL_FILE_SIZE];
+
+ in.read(buffer, 0, S_4K);
+ expectedReadBytes += S_4K;
+ verifyStatisticCounterValue(ioStats, STREAM_READ_BYTES,
+ expectedReadBytes);
+
+ in.seek(S_1K * 12);
+ in.read(buffer, 0, S_4K);
+ expectedReadBytes += S_4K;
+
+ verifyStatisticCounterValue(ioStats, STREAM_READ_BYTES,
+ expectedReadBytes);
+ printStreamStatistics(in);
+
+ verifyStatisticCounterValue(ioStats, ACTION_HTTP_GET_REQUEST, 1);
+ verifyStatisticCounterValue(ioStats, STREAM_READ_OPENED, 1);
+ verifyStatisticCounterValue(ioStats, STREAM_READ_PREFETCH_OPERATIONS, 0);
+ // the whole file is loaded
+ verifyStatisticGaugeValue(ioStats, STREAM_READ_ACTIVE_MEMORY_IN_USE,
SMALL_FILE_SIZE);
+ // there is no prefetching
+ verifyStatisticGaugeValue(ioStats, STREAM_READ_BLOCK_PREFETCH_ENABLED,
0);
+ // there is no caching
+ verifyStatisticGaugeValue(ioStats, STREAM_READ_BLOCK_CACHE_ENABLED, 0);
+ // no prefetch ops, so no action_executor_acquired
+ assertThatStatisticMaximum(ioStats,
+ ACTION_EXECUTOR_ACQUIRED + SUFFIX_MAX).isEqualTo(-1);
+
+ // now read offset 0 again and again, expect no new costs
+ in.readFully(0, buffer);
+ verifyStatisticCounterValue(ioStats, ACTION_HTTP_GET_REQUEST, 1);
+ expectedReadBytes += buffer.length;
+
+ verifyStatisticCounterValue(ioStats, STREAM_READ_BYTES,
+ expectedReadBytes);
+ // unbuffer
+ in.unbuffer();
Review Comment:
since this test is getting quite big..it might be better to have a separate
test for unbuffer
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/ITestS3APrefetchingLargeFiles.java:
##########
@@ -123,19 +142,34 @@ public void testReadLargeFileFully() throws Throwable {
verifyStatisticGaugeValue(ioStats, STREAM_READ_BLOCKS_IN_FILE_CACHE,
0);
}
+ printStreamStatistics(in);
// Assert that first block is read synchronously, following blocks are
prefetched
verifyStatisticCounterValue(ioStats, STREAM_READ_PREFETCH_OPERATIONS,
numBlocks - 1);
verifyStatisticCounterValue(ioStats, ACTION_HTTP_GET_REQUEST, numBlocks);
verifyStatisticCounterValue(ioStats, STREAM_READ_OPENED, numBlocks);
+
+ in.unbuffer();
+ // Verify that once stream is closed, all memory is freed
+ verifyStatisticGaugeValue(ioStats, STREAM_READ_ACTIVE_MEMORY_IN_USE, 0);
+ // prefetching is on
+ verifyStatisticGaugeValue(ioStats, STREAM_READ_BLOCK_PREFETCH_ENABLED,
1);
+ // there is no caching
Review Comment:
there is caching? same comment for line 132
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/ITestS3APrefetchingCacheFiles.java:
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.prefetch;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.assertj.core.api.Assertions;
+import org.assertj.core.api.Assumptions;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.S3ATestUtils;
+import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest;
+import org.apache.hadoop.fs.statistics.IOStatisticsContext;
+
+import static org.apache.hadoop.fs.s3a.Constants.BUFFER_DIR;
+import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_DEFAULT_SIZE;
+import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_SIZE_KEY;
+import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.enablePrefetch;
+import static
org.apache.hadoop.fs.statistics.IOStatisticsContext.getCurrentIOStatisticsContext;
+import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
+import static org.apache.hadoop.test.Sizes.S_1K;
+
+/**
+ * Test the cache file behaviour with prefetching input stream.
+ */
+public class ITestS3APrefetchingCacheFiles extends AbstractS3ACostTest {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ITestS3APrefetchingCacheFiles.class);
+ private static final int BLOCK_SIZE = S_1K * 10;
+
+ private Path testFile;
+ private S3AFileSystem testFileSystem;
+ private int prefetchBlockSize;
+ private Configuration conf;
+
+ /**
+ * Thread level IOStatistics; reset in setup().
+ */
+ private IOStatisticsContext ioStatisticsContext;
+
+ private File tmpFileDir;
+
+
+ @Before
+ public void setUp() throws Exception {
+ super.setup();
+ conf = createConfiguration();
+ tmpFileDir = File.createTempFile("ITestS3APrefetchingCacheFiles", "");
+ tmpFileDir.delete();
+ tmpFileDir.mkdirs();
+
+ conf.set(BUFFER_DIR, tmpFileDir.getAbsolutePath());
+ String testFileUri = S3ATestUtils.getCSVTestFile(conf);
+
+ testFile = new Path(testFileUri);
+ testFileSystem = (S3AFileSystem) FileSystem.get(testFile.toUri(), conf);
+
+ prefetchBlockSize = conf.getInt(PREFETCH_BLOCK_SIZE_KEY,
PREFETCH_BLOCK_DEFAULT_SIZE);
+ final FileStatus testFileStatus = testFileSystem.getFileStatus(testFile);
+ Assumptions.assumeThat(testFileStatus.getLen())
+ .describedAs("Test file %s is smaller than required size %d",
+ testFileStatus, prefetchBlockSize * 4)
+ .isGreaterThan(prefetchBlockSize);
+
+ ioStatisticsContext = getCurrentIOStatisticsContext();
+ ioStatisticsContext.reset();
+ }
+
+ @Override
+ public Configuration createConfiguration() {
+ Configuration conf = super.createConfiguration();
+ enablePrefetch(conf, true);
+ disableFilesystemCaching(conf);
+ S3ATestUtils.removeBaseAndBucketOverrides(conf, PREFETCH_BLOCK_SIZE_KEY);
+ conf.setInt(PREFETCH_BLOCK_SIZE_KEY, BLOCK_SIZE);
+ return conf;
+ }
+
+ @Override
+ public synchronized void teardown() throws Exception {
+ super.teardown();
+ tmpFileDir.delete();
+ File[] tmpFiles = tmpFileDir.listFiles();
+ if (tmpFiles != null) {
+ for (File filePath : tmpFiles) {
+ String path = filePath.getPath();
+ filePath.delete();
+ }
+ }
+ cleanupWithLogger(LOG, testFileSystem);
+ }
+
+ /**
+ * Test to verify the existence of the cache file.
+ * Tries to perform inputStream read and seek ops to make the prefetching
take place and
+ * asserts whether file with .bin suffix is present. It also verifies
certain file stats.
+ */
+ @Test
+ public void testCacheFileExistence() throws Throwable {
Review Comment:
thinking if we can also add a test to check caching gets disabled if it
takes too long....but not sure how to do it (or if it's possible)
Also a test that if it's unbuffer, it doesn't get cached
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/ITestS3APrefetchingLargeFiles.java:
##########
@@ -29,62 +29,77 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest;
-import org.apache.hadoop.fs.s3a.prefetch.S3APrefetchingInputStream;
-import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.statistics.IOStatisticsContext;
import org.apache.hadoop.test.LambdaTestUtils;
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_SIZE_KEY;
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.enablePrefetch;
+import static
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
+import static
org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertDurationRange;
import static
org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticMaximum;
import static
org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
import static
org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticGaugeValue;
+import static
org.apache.hadoop.fs.statistics.IOStatisticsContext.getCurrentIOStatisticsContext;
+import static
org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString;
import static
org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_EXECUTOR_ACQUIRED;
import static
org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_HTTP_GET_REQUEST;
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.SUFFIX_MAX;
import static
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_ACTIVE_MEMORY_IN_USE;
import static
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BLOCKS_IN_FILE_CACHE;
+import static
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BLOCK_ACQUIRE_AND_READ;
+import static
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BLOCK_CACHE_ENABLED;
+import static
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BLOCK_PREFETCH_ENABLED;
import static
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_OPENED;
import static
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_PREFETCH_OPERATIONS;
+import static
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_REMOTE_BLOCK_READ;
+import static org.apache.hadoop.test.Sizes.S_10M;
+import static org.apache.hadoop.test.Sizes.S_1K;
/**
* Test the prefetching input stream, validates that the underlying
S3ACachingInputStream and
* S3AInMemoryInputStream are working as expected.
*/
-public class ITestS3APrefetchingInputStream extends AbstractS3ACostTest {
-
- public ITestS3APrefetchingInputStream() {
- super(true);
- }
+public class ITestS3APrefetchingLargeFiles extends AbstractS3ACostTest {
private static final Logger LOG =
- LoggerFactory.getLogger(ITestS3APrefetchingInputStream.class);
+ LoggerFactory.getLogger(ITestS3APrefetchingLargeFiles.class);
private static final int S_500 = 512;
- private static final int S_1K = S_500 * 2;
- private static final int S_1M = S_1K * S_1K;
+
private int numBlocks;
// Size should be > block size so S3ACachingInputStream is used
private long largeFileSize;
- // Size should be < block size so S3AInMemoryInputStream is used
- private static final int SMALL_FILE_SIZE = S_1K * 9;
-
private static final int TIMEOUT_MILLIS = 5000;
private static final int INTERVAL_MILLIS = 500;
private static final int BLOCK_SIZE = S_1K * 10;
+ /**
+ * Thread level IOStatistics; reset in setup().
+ */
+ private IOStatisticsContext ioStatisticsContext;
+
@Override
public Configuration createConfiguration() {
Configuration conf = super.createConfiguration();
- S3ATestUtils.removeBaseAndBucketOverrides(conf, PREFETCH_ENABLED_KEY);
- S3ATestUtils.removeBaseAndBucketOverrides(conf, PREFETCH_BLOCK_SIZE_KEY);
+ removeBaseAndBucketOverrides(conf,
+ PREFETCH_ENABLED_KEY, PREFETCH_BLOCK_SIZE_KEY);
conf.setBoolean(PREFETCH_ENABLED_KEY, true);
conf.setInt(PREFETCH_BLOCK_SIZE_KEY, BLOCK_SIZE);
return conf;
}
+ @Override
+ public void setup() throws Exception {
+ super.setup();
+
+ ioStatisticsContext = getCurrentIOStatisticsContext();
+ ioStatisticsContext.reset();
+ }
+
private void createLargeFile() throws Exception {
Review Comment:
do you think it's worth following the same pattern as `AbstractS3ACostTest`,
which creates a huge file in a test, and then other tests assert that the file
exists. ITestInMemoryInputStream could extend it as well, and avoid creating
and tearing down the small file multiple times
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ARemoteInputStream.java:
##########
@@ -409,27 +455,54 @@ protected String getOffsetStr(long offset) {
return String.format("%d:%d", blockNumber, offset);
}
+ @Override
+ public synchronized void unbuffer() {
+ LOG.debug("{}: unbuffered", getName());
+ if (closeStream(true)) {
+ getS3AStreamStatistics().unbuffered();
+ }
+ }
+
+ /**
+ * Close the stream in close() or unbuffer().
+ * @param unbuffer is this an unbuffer operation?
+ * @return true if the stream was closed; false means it was already closed.
+ */
+ protected boolean closeStream(final boolean unbuffer) {
+
+ if (underlyingResourcesClosed.getAndSet(true)) {
+ return false;
+ }
+
+ if (unbuffer) {
Review Comment:
also, why just on unbuffer? shouldn't this be cleaned up on close() too?
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ACachingInputStream.java:
##########
@@ -76,36 +79,75 @@ public S3ACachingInputStream(
S3AInputStreamStatistics streamStatistics,
Configuration conf,
LocalDirAllocator localDirAllocator) {
- super(context, s3Attributes, client, streamStatistics);
- this.numBlocksToPrefetch = this.getContext().getPrefetchBlockCount();
- int bufferPoolSize = this.numBlocksToPrefetch + 1;
- this.blockManager = this.createBlockManager(
- this.getContext().getFuturePool(),
- this.getReader(),
- this.getBlockData(),
- bufferPoolSize,
- conf,
- localDirAllocator);
+ super(context, s3Attributes, client, streamStatistics);
+ this.conf = conf;
+ this.localDirAllocator = localDirAllocator;
+ this.numBlocksToPrefetch = getContext().getPrefetchBlockCount();
+ demandCreateBlockManager();
int fileSize = (int) s3Attributes.getLen();
LOG.debug("Created caching input stream for {} (size = {})",
this.getName(),
fileSize);
+ streamStatistics.setPrefetchState(numBlocksToPrefetch > 0,
Review Comment:
nevermind...since S3AInMemoryInputStream doesn't actually do any prefetching
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ACachingInputStream.java:
##########
@@ -76,36 +79,75 @@ public S3ACachingInputStream(
S3AInputStreamStatistics streamStatistics,
Configuration conf,
LocalDirAllocator localDirAllocator) {
- super(context, s3Attributes, client, streamStatistics);
- this.numBlocksToPrefetch = this.getContext().getPrefetchBlockCount();
- int bufferPoolSize = this.numBlocksToPrefetch + 1;
- this.blockManager = this.createBlockManager(
- this.getContext().getFuturePool(),
- this.getReader(),
- this.getBlockData(),
- bufferPoolSize,
- conf,
- localDirAllocator);
+ super(context, s3Attributes, client, streamStatistics);
+ this.conf = conf;
+ this.localDirAllocator = localDirAllocator;
+ this.numBlocksToPrefetch = getContext().getPrefetchBlockCount();
+ demandCreateBlockManager();
int fileSize = (int) s3Attributes.getLen();
LOG.debug("Created caching input stream for {} (size = {})",
this.getName(),
fileSize);
+ streamStatistics.setPrefetchState(numBlocksToPrefetch > 0,
Review Comment:
why the `numBlocksToPrefetch > 0` though? numBlocksToPrefetch will always be
> 0 because in S3AFS we do `prefetchBlockCount = intOption(conf,
PREFETCH_BLOCK_COUNT_KEY, PREFETCH_BLOCK_DEFAULT_COUNT, 1);`
> s3a prefetching stream to support unbuffer()
> --------------------------------------------
>
> Key: HADOOP-18184
> URL: https://issues.apache.org/jira/browse/HADOOP-18184
> Project: Hadoop Common
> Issue Type: Sub-task
> Components: fs/s3
> Affects Versions: 3.4.0
> Reporter: Steve Loughran
> Assignee: Steve Loughran
> Priority: Minor
> Labels: pull-request-available
>
> Apache Impala uses unbuffer() to free up all client side resources held by a
> stream, so allowing it to have a map of available (path -> stream) objects,
> retained across queries.
> This saves on having to reopen the files, with the cost of HEAD checks etc.
> S3AInputStream just closes its http connection. here there is a lot more
> state to discard, but all memory and file storage must be freed.
> until this done, ITestS3AContractUnbuffer must skip when the prefetch stream
> is used.
> its notable that the other tests don't fail, even though the stream doesn't
> implement the interface; the graceful degradation handles that. it should
> fail if the test xml resource says the stream does it, but that the stream
> capabilities say it doesn't.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]