This is an automated email from the ASF dual-hosted git repository. mcvsubbu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 1382d29 Disable recursion in PinotFS copy (#8162) 1382d29 is described below commit 1382d293236d40e80d1652729fdb1632f92f7e48 Author: Subbu Subramaniam <mcvsu...@users.noreply.github.com> AuthorDate: Tue Feb 8 15:10:30 2022 -0800 Disable recursion in PinotFS copy (#8162) * Disable recursion in PinotFS copy All uses of PinotFS copy API involve copying a tarred segment and untarring it. So, copying a directory recursively will not work (the untar will fail). It also results in wastage of effort in copying across file systems. Also disabled the file scheme in during segment upload on the controller, since the URL based upload is meant to provide an external URL to be picked up by the controller. * Removed redundant comment * Fix to add a different API for recursive copy * Fix lint errors --- .../utils/fetcher/SegmentFetcherFactory.java | 1 + .../PinotSegmentUploadDownloadRestletResource.java | 5 ++++ .../hadoop/HadoopSegmentGenerationJobRunner.java | 6 ++--- .../pinot/plugin/filesystem/HadoopPinotFS.java | 15 +++++++++++ .../apache/pinot/spi/filesystem/LocalPinotFS.java | 25 ++++++++++++++----- .../org/apache/pinot/spi/filesystem/PinotFS.java | 16 ++++++++++-- .../pinot/spi/filesystem/LocalPinotFSTest.java | 29 ---------------------- 7 files changed, 57 insertions(+), 40 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/SegmentFetcherFactory.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/SegmentFetcherFactory.java index c6d71e8..743cd1b 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/SegmentFetcherFactory.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/SegmentFetcherFactory.java @@ -144,6 +144,7 @@ public class SegmentFetcherFactory { private void fetchSegmentToLocalInternal(URI uri, File dest) throws Exception { + // caller untars getSegmentFetcher(uri.getScheme()).fetchSegmentToLocal(uri, dest); } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java index 484bc30..253a3fb 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java @@ -381,6 +381,11 @@ public class PinotSegmentUploadDownloadRestletResource { } LOGGER.info("Downloading segment from {} to {} for table {}", currentSegmentLocationURI, destFile.getAbsolutePath(), tableName); + URI uri = new URI(currentSegmentLocationURI); + if (uri.getScheme().equalsIgnoreCase("file")) { + throw new ControllerApplicationException(LOGGER, "Unsupported URI: " + currentSegmentLocationURI, + Response.Status.BAD_REQUEST); + } SegmentFetcherFactory.fetchSegmentToLocal(currentSegmentLocationURI, destFile); } diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java index 4cbfdf5..0608fd7 100644 --- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java +++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java @@ -259,7 +259,7 @@ public class HadoopSegmentGenerationJobRunner extends Configured implements Inge job.getConfiguration().setBoolean(MRJobConfig.MAP_SPECULATIVE, false); // But we have to copy ourselves to HDFS, and add us to the distributed cache, so - // that the mapper code is available. + // that the mapper code is available. addMapperJarToDistributedCache(job, outputDirFS, stagingDirURI); org.apache.hadoop.conf.Configuration jobConf = job.getConfiguration(); @@ -277,7 +277,7 @@ public class HadoopSegmentGenerationJobRunner extends Configured implements Inge // In order to ensure pinot plugins would be loaded to each worker, this method // tars entire plugins directory and set this file into Distributed cache. // Then each mapper job will untar the plugin tarball, and set system properties accordingly. - // Note that normally we'd just use Hadoop's support for putting jars on the + // Note that normally we'd just use Hadoop's support for putting jars on the // classpath via the distributed cache, but some of the plugins (e.g. the pinot-parquet // input format) include Hadoop classes, which can be incompatibile with the Hadoop // installation/jars being used to run the mapper, leading to errors such as: @@ -386,7 +386,7 @@ public class HadoopSegmentGenerationJobRunner extends Configured implements Inge throws Exception { File ourJar = new File(getClass().getProtectionDomain().getCodeSource().getLocation().toURI()); Path distributedCacheJar = new Path(stagingDirURI.toString(), ourJar.getName()); - outputDirFS.copyFromLocalFile(ourJar, distributedCacheJar.toUri()); + outputDirFS.copyFromLocalDir(ourJar, distributedCacheJar.toUri()); job.addFileToClassPath(distributedCacheJar); } diff --git a/pinot-plugins/pinot-file-system/pinot-hdfs/src/main/java/org/apache/pinot/plugin/filesystem/HadoopPinotFS.java b/pinot-plugins/pinot-file-system/pinot-hdfs/src/main/java/org/apache/pinot/plugin/filesystem/HadoopPinotFS.java index 760c6d2..5136c10 100644 --- a/pinot-plugins/pinot-file-system/pinot-hdfs/src/main/java/org/apache/pinot/plugin/filesystem/HadoopPinotFS.java +++ b/pinot-plugins/pinot-file-system/pinot-hdfs/src/main/java/org/apache/pinot/plugin/filesystem/HadoopPinotFS.java @@ -183,6 +183,9 @@ public class HadoopPinotFS extends BasePinotFS { if (_hadoopFS == null) { throw new RuntimeException("_hadoopFS client is not initialized when trying to copy files"); } + if (_hadoopFS.isDirectory(remoteFile)) { + throw new IllegalArgumentException(srcUri.toString() + " is a direactory"); + } long startMs = System.currentTimeMillis(); _hadoopFS.copyToLocalFile(remoteFile, localFile); LOGGER.debug("copied {} from hdfs to {} in local for size {}, take {} ms", srcUri, dstFilePath, dstFile.length(), @@ -196,9 +199,21 @@ public class HadoopPinotFS extends BasePinotFS { @Override public void copyFromLocalFile(File srcFile, URI dstUri) throws Exception { + if (srcFile.isDirectory()) { + throw new IllegalArgumentException(srcFile.getAbsolutePath() + " is a direactory"); + } _hadoopFS.copyFromLocalFile(new Path(srcFile.toURI()), new Path(dstUri)); } + public void copyFromLocalDir(File srcFile, URI dstUri) + throws Exception { + Path srcPath = new Path(srcFile.toURI()); + if (!_hadoopFS.isDirectory(srcPath)) { + throw new IllegalArgumentException(srcFile.getAbsolutePath() + " is not a directory"); + } + _hadoopFS.copyFromLocalFile(srcPath, new Path(dstUri)); + } + @Override public boolean isDirectory(URI uri) { try { diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/LocalPinotFS.java b/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/LocalPinotFS.java index cca7a15..4a87f3b 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/LocalPinotFS.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/LocalPinotFS.java @@ -85,7 +85,7 @@ public class LocalPinotFS extends BasePinotFS { @Override public boolean copy(URI srcUri, URI dstUri) throws IOException { - copy(toFile(srcUri), toFile(dstUri)); + copy(toFile(srcUri), toFile(dstUri), false); return true; } @@ -118,13 +118,22 @@ public class LocalPinotFS extends BasePinotFS { @Override public void copyToLocalFile(URI srcUri, File dstFile) throws Exception { - copy(toFile(srcUri), dstFile); + copy(toFile(srcUri), dstFile, false); } @Override public void copyFromLocalFile(File srcFile, URI dstUri) throws Exception { - copy(srcFile, toFile(dstUri)); + copy(srcFile, toFile(dstUri), false); + } + + @Override + public void copyFromLocalDir(File srcFile, URI dstUri) + throws Exception { + if (!srcFile.isDirectory()) { + throw new IllegalArgumentException(srcFile.getAbsolutePath() + " is not a directory"); + } + copy(srcFile, toFile(dstUri), true); } @Override @@ -163,14 +172,18 @@ public class LocalPinotFS extends BasePinotFS { } } - private static void copy(File srcFile, File dstFile) + private static void copy(File srcFile, File dstFile, boolean recursive) throws IOException { if (dstFile.exists()) { FileUtils.deleteQuietly(dstFile); } if (srcFile.isDirectory()) { - // Throws Exception on failure - FileUtils.copyDirectory(srcFile, dstFile); + if (recursive) { + FileUtils.copyDirectory(srcFile, dstFile); + } else { + // Throws Exception on failure + throw new IOException(srcFile.getAbsolutePath() + " is a directory"); + } } else { // Will create parent directories, throws Exception on failure FileUtils.copyFile(srcFile, dstFile); diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/PinotFS.java b/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/PinotFS.java index 17d4032..af0d45b 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/PinotFS.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/PinotFS.java @@ -143,7 +143,7 @@ public interface PinotFS extends Closeable, Serializable { /** * Copies a file from a remote filesystem to the local one. Keeps the original file. - * @param srcUri location of current file on remote filesystem + * @param srcUri location of current file on remote filesystem (must not be a directory) * @param dstFile location of destination on local filesystem * @throws Exception if srcUri is not valid or not present, or timeout when downloading file to local */ @@ -151,9 +151,21 @@ public interface PinotFS extends Closeable, Serializable { throws Exception; /** + * @apiNote This API is to be used with caution, since recursive copies can lead to adverse situations. + * + * Add srcFile to filesystem at the given dst name and the source is kept intact afterwards. + * @param srcFile location of src file on local disk (must be a directory) + * @param dstUri location of dst on remote filesystem + * @throws Exception if fileUri is not valid or not present, or timeout when uploading file from local + */ + default void copyFromLocalDir(File srcFile, URI dstUri) + throws Exception { + throw new UnsupportedOperationException("Recursive copy not supported"); + } + /** * The src file is on the local disk. Add it to filesystem at the given dst name and the source is kept intact * afterwards. - * @param srcFile location of src file on local disk + * @param srcFile location of src file on local disk (must not be a directory) * @param dstUri location of dst on remote filesystem * @throws Exception if fileUri is not valid or not present, or timeout when uploading file from local */ diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/filesystem/LocalPinotFSTest.java b/pinot-spi/src/test/java/org/apache/pinot/spi/filesystem/LocalPinotFSTest.java index 7f6e387..c4bc5d2 100644 --- a/pinot-spi/src/test/java/org/apache/pinot/spi/filesystem/LocalPinotFSTest.java +++ b/pinot-spi/src/test/java/org/apache/pinot/spi/filesystem/LocalPinotFSTest.java @@ -205,28 +205,6 @@ public class LocalPinotFSTest { // Expected. } - // Check that directory only copy worked - localPinotFS.copy(firstTempDir.toURI(), secondTempDir.toURI()); - Assert.assertTrue(localPinotFS.exists(secondTempDir.toURI())); - - // Copying directory with files to directory with files - File testFile = new File(firstTempDir, "testFile"); - Assert.assertTrue(testFile.createNewFile(), "Could not create file " + testFile.getPath()); - File newTestFile = new File(secondTempDir, "newTestFile"); - Assert.assertTrue(newTestFile.createNewFile(), "Could not create file " + newTestFile.getPath()); - - localPinotFS.copy(firstTempDir.toURI(), secondTempDir.toURI()); - Assert.assertEquals(localPinotFS.listFiles(secondTempDir.toURI(), true).length, 1); - - // Copying directory with files under another directory. - File firstTempDirUnderSecondTempDir = new File(secondTempDir, firstTempDir.getName()); - localPinotFS.copy(firstTempDir.toURI(), firstTempDirUnderSecondTempDir.toURI()); - Assert.assertTrue(localPinotFS.exists(firstTempDirUnderSecondTempDir.toURI())); - // There're two files/directories under secondTempDir. - Assert.assertEquals(localPinotFS.listFiles(secondTempDir.toURI(), false).length, 2); - // The file under src directory also got copied under dst directory. - Assert.assertEquals(localPinotFS.listFiles(firstTempDirUnderSecondTempDir.toURI(), true).length, 1); - // len of dir = exception try { localPinotFS.length(firstTempDir.toURI()); @@ -234,12 +212,5 @@ public class LocalPinotFSTest { } catch (IllegalArgumentException e) { } - - Assert.assertTrue(testFile.exists()); - - localPinotFS.copyFromLocalFile(testFile, secondTestFileUri); - Assert.assertTrue(localPinotFS.exists(secondTestFileUri)); - localPinotFS.copyToLocalFile(testFile.toURI(), new File(secondTestFileUri)); - Assert.assertTrue(localPinotFS.exists(secondTestFileUri)); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org