[
https://issues.apache.org/jira/browse/HADOOP-18184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17754223#comment-17754223
]
ASF GitHub Bot commented on HADOOP-18184:
-----------------------------------------
steveloughran commented on code in PR #5832:
URL: https://github.com/apache/hadoop/pull/5832#discussion_r1293843650
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/ITestS3APrefetchingCacheFiles.java:
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.prefetch;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.assertj.core.api.Assertions;
+import org.assertj.core.api.Assumptions;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.S3ATestUtils;
+import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest;
+import org.apache.hadoop.fs.statistics.IOStatisticsContext;
+
+import static org.apache.hadoop.fs.s3a.Constants.BUFFER_DIR;
+import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_DEFAULT_SIZE;
+import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_SIZE_KEY;
+import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.enablePrefetch;
+import static
org.apache.hadoop.fs.statistics.IOStatisticsContext.getCurrentIOStatisticsContext;
+import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
+import static org.apache.hadoop.test.Sizes.S_1K;
+
+/**
+ * Test the cache file behaviour with prefetching input stream.
+ */
+public class ITestS3APrefetchingCacheFiles extends AbstractS3ACostTest {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ITestS3APrefetchingCacheFiles.class);
+ private static final int BLOCK_SIZE = S_1K * 10;
+
+ private Path testFile;
+ private S3AFileSystem testFileSystem;
+ private int prefetchBlockSize;
+ private Configuration conf;
+
+ /**
+ * Thread level IOStatistics; reset in setup().
+ */
+ private IOStatisticsContext ioStatisticsContext;
+
+ private File tmpFileDir;
+
+
+ @Before
+ public void setUp() throws Exception {
+ super.setup();
+ conf = createConfiguration();
+ tmpFileDir = File.createTempFile("ITestS3APrefetchingCacheFiles", "");
+ tmpFileDir.delete();
+ tmpFileDir.mkdirs();
+
+ conf.set(BUFFER_DIR, tmpFileDir.getAbsolutePath());
+ String testFileUri = S3ATestUtils.getCSVTestFile(conf);
+
+ testFile = new Path(testFileUri);
+ testFileSystem = (S3AFileSystem) FileSystem.get(testFile.toUri(), conf);
+
+ prefetchBlockSize = conf.getInt(PREFETCH_BLOCK_SIZE_KEY,
PREFETCH_BLOCK_DEFAULT_SIZE);
+ final FileStatus testFileStatus = testFileSystem.getFileStatus(testFile);
+ Assumptions.assumeThat(testFileStatus.getLen())
+ .describedAs("Test file %s is smaller than required size %d",
+ testFileStatus, prefetchBlockSize * 4)
+ .isGreaterThan(prefetchBlockSize);
+
+ ioStatisticsContext = getCurrentIOStatisticsContext();
+ ioStatisticsContext.reset();
+ }
+
+ @Override
+ public Configuration createConfiguration() {
+ Configuration conf = super.createConfiguration();
+ enablePrefetch(conf, true);
+ disableFilesystemCaching(conf);
+ S3ATestUtils.removeBaseAndBucketOverrides(conf, PREFETCH_BLOCK_SIZE_KEY);
+ conf.setInt(PREFETCH_BLOCK_SIZE_KEY, BLOCK_SIZE);
+ return conf;
+ }
+
+ @Override
+ public synchronized void teardown() throws Exception {
+ super.teardown();
+ tmpFileDir.delete();
+ File[] tmpFiles = tmpFileDir.listFiles();
+ if (tmpFiles != null) {
+ for (File filePath : tmpFiles) {
+ String path = filePath.getPath();
+ filePath.delete();
+ }
+ }
+ cleanupWithLogger(LOG, testFileSystem);
+ }
+
+ /**
+ * Test to verify the existence of the cache file.
+ * Tries to perform inputStream read and seek ops to make the prefetching
take place and
+ * asserts whether file with .bin suffix is present. It also verifies
certain file stats.
+ */
+ @Test
+ public void testCacheFileExistence() throws Throwable {
+ describe("Verify that FS cache files exist on local FS");
+ skipIfClientSideEncryption();
+
+ try (FSDataInputStream in = testFileSystem.open(testFile)) {
+ byte[] buffer = new byte[prefetchBlockSize];
+
+ in.read(buffer, 0, prefetchBlockSize - 10240);
+ assertCacheFileExists();
Review Comment:
this turns out to be more complex than I'd thought, as there are some long
standing behaviours in ensureCurrentBuffer which i suspect is broken.
> s3a prefetching stream to support unbuffer()
> --------------------------------------------
>
> Key: HADOOP-18184
> URL: https://issues.apache.org/jira/browse/HADOOP-18184
> Project: Hadoop Common
> Issue Type: Sub-task
> Components: fs/s3
> Affects Versions: 3.4.0
> Reporter: Steve Loughran
> Assignee: Steve Loughran
> Priority: Minor
> Labels: pull-request-available
>
> Apache Impala uses unbuffer() to free up all client side resources held by a
> stream, so allowing it to have a map of available (path -> stream) objects,
> retained across queries.
> This saves on having to reopen the files, with the cost of HEAD checks etc.
> S3AInputStream just closes its http connection. here there is a lot more
> state to discard, but all memory and file storage must be freed.
> until this done, ITestS3AContractUnbuffer must skip when the prefetch stream
> is used.
> its notable that the other tests don't fail, even though the stream doesn't
> implement the interface; the graceful degradation handles that. it should
> fail if the test xml resource says the stream does it, but that the stream
> capabilities say it doesn't.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]