This is an automated email from the ASF dual-hosted git repository. xiangfu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push: new 6b43aef cleanup tar.gz segment files on job exit (#6385) 6b43aef is described below commit 6b43aef4c9d4bd558c29ca3b68552f91bcaff693 Author: Karthik Amarnath <kaamarn...@linkedin.com> AuthorDate: Tue Dec 29 12:03:43 2020 -0800 cleanup tar.gz segment files on job exit (#6385) * cleanup tar.gz post upload complete. * cleanup tar.gz segment files on job exit. --- .../ingestion/batch/common/SegmentPushUtils.java | 44 +++++++++++++++------- .../batch/spec/SegmentGenerationJobSpec.java | 13 +++++++ 2 files changed, 44 insertions(+), 13 deletions(-) diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtils.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtils.java index 71ccfbe..4c3b41b 100644 --- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtils.java +++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtils.java @@ -42,6 +42,7 @@ import org.apache.pinot.common.utils.SimpleHttpResponse; import org.apache.pinot.common.utils.TarGzCompressionUtils; import org.apache.pinot.core.segment.creator.impl.V1Constants; import org.apache.pinot.spi.filesystem.PinotFS; +import org.apache.pinot.spi.filesystem.PinotFSFactory; import org.apache.pinot.spi.ingestion.batch.spec.Constants; import org.apache.pinot.spi.ingestion.batch.spec.PinotClusterSpec; import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec; @@ -77,7 +78,8 @@ public class SegmentPushUtils implements Serializable { return new URI(scheme, fileURI.getUserInfo(), host, port, fileURI.getPath(), fileURI.getQuery(), fileURI.getFragment()); } catch (URISyntaxException e) { - LOGGER.warn("Unable to generate push uri based from dir URI: {} and file URI: {}, directly return file URI.", dirURI, fileURI); + LOGGER.warn("Unable to generate push uri based from dir URI: {} and file URI: {}, directly return file URI.", + dirURI, fileURI); return fileURI; } } @@ -87,6 +89,7 @@ public class SegmentPushUtils implements Serializable { public static void pushSegments(SegmentGenerationJobSpec spec, PinotFS fileSystem, List<String> tarFilePaths) throws RetriableOperationException, AttemptsExceededException { String tableName = spec.getTableSpec().getTableName(); + boolean cleanUpOutputDir = spec.isCleanUpOutputDir(); LOGGER.info("Start pushing segments: {}... to locations: {} for table {}", Arrays.toString(tarFilePaths.subList(0, Math.min(5, tarFilePaths.size())).toArray()), Arrays.toString(spec.getPinotClusterSpecs()), tableName); @@ -134,6 +137,10 @@ public class SegmentPushUtils implements Serializable { segmentName, controllerURI, e); throw e; } + } finally { + if (cleanUpOutputDir) { + fileSystem.delete(tarFileURI, true); + } } }); } @@ -147,6 +154,8 @@ public class SegmentPushUtils implements Serializable { Arrays.toString(segmentUris.subList(0, Math.min(5, segmentUris.size())).toArray()), Arrays.toString(spec.getPinotClusterSpecs())); for (String segmentUri : segmentUris) { + URI segmentURI = URI.create(segmentUri); + PinotFS outputDirFS = PinotFSFactory.create(segmentURI.getScheme()); for (PinotClusterSpec pinotClusterSpec : spec.getPinotClusterSpecs()) { URI controllerURI; try { @@ -183,6 +192,10 @@ public class SegmentPushUtils implements Serializable { tableName, segmentUri, controllerURI, e); throw e; } + } finally { + if (spec.isCleanUpOutputDir()) { + outputDirFS.delete(segmentURI, true); + } } }); } @@ -202,11 +215,11 @@ public class SegmentPushUtils implements Serializable { * @param segmentUriToTarPathMap contains the map of segment DownloadURI to segment tar file path * @throws Exception */ - public static void sendSegmentUriAndMetadata(SegmentGenerationJobSpec spec, PinotFS fileSystem, Map<String, String> segmentUriToTarPathMap) + public static void sendSegmentUriAndMetadata(SegmentGenerationJobSpec spec, PinotFS fileSystem, + Map<String, String> segmentUriToTarPathMap) throws Exception { String tableName = spec.getTableSpec().getTableName(); - LOGGER.info("Start pushing segment metadata: {} to locations: {} for table {}", - segmentUriToTarPathMap, + LOGGER.info("Start pushing segment metadata: {} to locations: {} for table {}", segmentUriToTarPathMap, Arrays.toString(spec.getPinotClusterSpecs()), tableName); for (String segmentUriPath : segmentUriToTarPathMap.keySet()) { String tarFilePath = segmentUriToTarPathMap.get(segmentUriPath); @@ -233,15 +246,17 @@ public class SegmentPushUtils implements Serializable { } RetryPolicies.exponentialBackoffRetryPolicy(attempts, retryWaitMs, 5).attempt(() -> { try { - List<Header> headers = ImmutableList.of( - new BasicHeader(FileUploadDownloadClient.CustomHeaders.DOWNLOAD_URI, segmentUriPath), - new BasicHeader(FileUploadDownloadClient.CustomHeaders.UPLOAD_TYPE, FileUploadDownloadClient.FileUploadType.METADATA.toString())); + List<Header> headers = ImmutableList + .of(new BasicHeader(FileUploadDownloadClient.CustomHeaders.DOWNLOAD_URI, segmentUriPath), + new BasicHeader(FileUploadDownloadClient.CustomHeaders.UPLOAD_TYPE, + FileUploadDownloadClient.FileUploadType.METADATA.toString())); // Add table name as a request parameter NameValuePair tableNameValuePair = new BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_NAME, tableName); List<NameValuePair> parameters = Arrays.asList(tableNameValuePair); - SimpleHttpResponse response = FILE_UPLOAD_DOWNLOAD_CLIENT.uploadSegmentMetadata(FileUploadDownloadClient.getUploadSegmentURI(controllerURI), - segmentName, segmentMetadataFile, headers, parameters, FILE_UPLOAD_DOWNLOAD_CLIENT.DEFAULT_SOCKET_TIMEOUT_MS); + SimpleHttpResponse response = FILE_UPLOAD_DOWNLOAD_CLIENT + .uploadSegmentMetadata(FileUploadDownloadClient.getUploadSegmentURI(controllerURI), segmentName, + segmentMetadataFile, headers, parameters, FILE_UPLOAD_DOWNLOAD_CLIENT.DEFAULT_SOCKET_TIMEOUT_MS); LOGGER.info("Response for pushing table {} segment {} to location {} - {}: {}", tableName, segmentName, controllerURI, response.getStatusCode(), response.getResponse()); return true; @@ -268,12 +283,13 @@ public class SegmentPushUtils implements Serializable { } } - public static Map<String, String> getSegmentUriToTarPathMap(URI outputDirURI, String uriPrefix, String uriSuffix, String[] files) { + public static Map<String, String> getSegmentUriToTarPathMap(URI outputDirURI, String uriPrefix, String uriSuffix, + String[] files) { Map<String, String> segmentUriToTarPathMap = new HashMap<>(); for (String file : files) { URI uri = URI.create(file); if (uri.getPath().endsWith(Constants.TAR_GZ_FILE_EXT)) { - URI updatedURI = SegmentPushUtils.generateSegmentTarURI(outputDirURI, uri, uriPrefix,uriSuffix); + URI updatedURI = SegmentPushUtils.generateSegmentTarURI(outputDirURI, uri, uriPrefix, uriSuffix); segmentUriToTarPathMap.put(updatedURI.toString(), file); } } @@ -293,7 +309,8 @@ public class SegmentPushUtils implements Serializable { private static File generateSegmentMetadataFile(PinotFS fileSystem, URI tarFileURI) throws Exception { String uuid = UUID.randomUUID().toString(); - File tarFile = new File(FileUtils.getTempDirectory(), "segmentTar-" + uuid + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION); + File tarFile = + new File(FileUtils.getTempDirectory(), "segmentTar-" + uuid + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION); File segmentMetadataDir = new File(FileUtils.getTempDirectory(), "segmentMetadataDir-" + uuid); try { fileSystem.copyToLocalFile(tarFileURI, tarFile); @@ -312,7 +329,8 @@ public class SegmentPushUtils implements Serializable { TarGzCompressionUtils.untarOneFile(tarFile, V1Constants.SEGMENT_CREATION_META, new File(segmentMetadataDir, V1Constants.SEGMENT_CREATION_META)); - File segmentMetadataTarFile = new File(FileUtils.getTempDirectory(), "segmentMetadata-" + uuid + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION); + File segmentMetadataTarFile = new File(FileUtils.getTempDirectory(), + "segmentMetadata-" + uuid + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION); if (segmentMetadataTarFile.exists()) { FileUtils.forceDelete(segmentMetadataTarFile); } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/SegmentGenerationJobSpec.java b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/SegmentGenerationJobSpec.java index ab199bc..740beaf 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/SegmentGenerationJobSpec.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/SegmentGenerationJobSpec.java @@ -111,6 +111,11 @@ public class SegmentGenerationJobSpec implements Serializable { */ private PushJobSpec _pushJobSpec; + /** + * Should clean up output segment on job completion. + */ + private boolean _cleanUpOutputDir; + public ExecutionFrameworkSpec getExecutionFrameworkSpec() { return _executionFrameworkSpec; } @@ -247,6 +252,14 @@ public class SegmentGenerationJobSpec implements Serializable { public void setSegmentCreationJobParallelism(int segmentCreationJobParallelism) { _segmentCreationJobParallelism = segmentCreationJobParallelism; } + + public void setCleanUpOutputDir(boolean cleanUpOutputDir) { + _cleanUpOutputDir = cleanUpOutputDir; + } + + public boolean isCleanUpOutputDir() { + return _cleanUpOutputDir; + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org