kkrugler commented on a change in pull request #6506: URL: https://github.com/apache/incubator-pinot/pull/6506#discussion_r568254949
########## File path: pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java ########## @@ -269,33 +300,80 @@ public void run() throw new RuntimeException("Job failed: " + job); } - LOGGER.info("Trying to copy segment tars from staging directory: [{}] to output directory [{}]", stagingDirURI, + LOGGER.info("Moving segment tars from staging directory [{}] to output directory [{}]", stagingDirURI, outputDirURI); - outputDirFS.copy(new Path(stagingDir, SEGMENT_TAR_DIR).toUri(), outputDirURI); + moveFiles(outputDirFS, new Path(stagingDir, SEGMENT_TAR_SUBDIR_NAME).toUri(), outputDirURI, _spec.isOverwriteOutput()); } finally { LOGGER.info("Trying to clean up staging directory: [{}]", stagingDirURI); outputDirFS.delete(stagingDirURI, true); } } + /** + * Move all files from the <sourceDir> to the <destDir>, but don't delete existing contents of destDir. + * If <overwrite> is true, and the source file exists in the destination directory, then replace it, otherwise + * log a warning and continue. We assume that source and destination directories are on the same filesystem, + * so that move() can be used. + * + * @param fs + * @param sourceDir + * @param destDir + * @param overwrite + * @throws IOException + * @throws URISyntaxException + */ + private void moveFiles(PinotFS fs, URI sourceDir, URI destDir, boolean overwrite) throws IOException, URISyntaxException { + for (String sourcePath : fs.listFiles(sourceDir, true)) { + URI sourceFileUri = SegmentGenerationUtils.getFileURI(sourcePath, sourceDir); + String sourceFilename = FilenameUtils.getName(sourceFileUri.getPath()); + URI destFileUri = SegmentGenerationUtils.getRelativeOutputPath(sourceDir, sourceFileUri, destDir).resolve(sourceFilename); + + if (!overwrite && fs.exists(destFileUri)) { + LOGGER.warn("Can't overwrite existing output segment tar file: {}", destFileUri); Review comment: Good idea, let me look into that ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org