This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 6b43aef  cleanup tar.gz segment files on job exit (#6385)
6b43aef is described below

commit 6b43aef4c9d4bd558c29ca3b68552f91bcaff693
Author: Karthik Amarnath <kaamarn...@linkedin.com>
AuthorDate: Tue Dec 29 12:03:43 2020 -0800

    cleanup tar.gz segment files on job exit (#6385)
    
    * cleanup tar.gz post upload complete.
    
    * cleanup tar.gz segment files on job exit.
---
 .../ingestion/batch/common/SegmentPushUtils.java   | 44 +++++++++++++++-------
 .../batch/spec/SegmentGenerationJobSpec.java       | 13 +++++++
 2 files changed, 44 insertions(+), 13 deletions(-)

diff --git 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtils.java
 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtils.java
index 71ccfbe..4c3b41b 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtils.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtils.java
@@ -42,6 +42,7 @@ import org.apache.pinot.common.utils.SimpleHttpResponse;
 import org.apache.pinot.common.utils.TarGzCompressionUtils;
 import org.apache.pinot.core.segment.creator.impl.V1Constants;
 import org.apache.pinot.spi.filesystem.PinotFS;
+import org.apache.pinot.spi.filesystem.PinotFSFactory;
 import org.apache.pinot.spi.ingestion.batch.spec.Constants;
 import org.apache.pinot.spi.ingestion.batch.spec.PinotClusterSpec;
 import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
@@ -77,7 +78,8 @@ public class SegmentPushUtils implements Serializable {
         return new URI(scheme, fileURI.getUserInfo(), host, port, 
fileURI.getPath(), fileURI.getQuery(),
             fileURI.getFragment());
       } catch (URISyntaxException e) {
-        LOGGER.warn("Unable to generate push uri based from dir URI: {} and 
file URI: {}, directly return file URI.", dirURI, fileURI);
+        LOGGER.warn("Unable to generate push uri based from dir URI: {} and 
file URI: {}, directly return file URI.",
+            dirURI, fileURI);
         return fileURI;
       }
     }
@@ -87,6 +89,7 @@ public class SegmentPushUtils implements Serializable {
   public static void pushSegments(SegmentGenerationJobSpec spec, PinotFS 
fileSystem, List<String> tarFilePaths)
       throws RetriableOperationException, AttemptsExceededException {
     String tableName = spec.getTableSpec().getTableName();
+    boolean cleanUpOutputDir = spec.isCleanUpOutputDir();
     LOGGER.info("Start pushing segments: {}... to locations: {} for table {}",
         Arrays.toString(tarFilePaths.subList(0, Math.min(5, 
tarFilePaths.size())).toArray()),
         Arrays.toString(spec.getPinotClusterSpecs()), tableName);
@@ -134,6 +137,10 @@ public class SegmentPushUtils implements Serializable {
                       segmentName, controllerURI, e);
               throw e;
             }
+          } finally {
+            if (cleanUpOutputDir) {
+              fileSystem.delete(tarFileURI, true);
+            }
           }
         });
       }
@@ -147,6 +154,8 @@ public class SegmentPushUtils implements Serializable {
         Arrays.toString(segmentUris.subList(0, Math.min(5, 
segmentUris.size())).toArray()),
         Arrays.toString(spec.getPinotClusterSpecs()));
     for (String segmentUri : segmentUris) {
+      URI segmentURI = URI.create(segmentUri);
+      PinotFS outputDirFS = PinotFSFactory.create(segmentURI.getScheme());
       for (PinotClusterSpec pinotClusterSpec : spec.getPinotClusterSpecs()) {
         URI controllerURI;
         try {
@@ -183,6 +192,10 @@ public class SegmentPushUtils implements Serializable {
                   tableName, segmentUri, controllerURI, e);
               throw e;
             }
+          } finally {
+            if (spec.isCleanUpOutputDir()) {
+              outputDirFS.delete(segmentURI, true);
+            }
           }
         });
       }
@@ -202,11 +215,11 @@ public class SegmentPushUtils implements Serializable {
    * @param segmentUriToTarPathMap contains the map of segment DownloadURI to 
segment tar file path
    * @throws Exception
    */
-  public static void sendSegmentUriAndMetadata(SegmentGenerationJobSpec spec, 
PinotFS fileSystem, Map<String, String> segmentUriToTarPathMap)
+  public static void sendSegmentUriAndMetadata(SegmentGenerationJobSpec spec, 
PinotFS fileSystem,
+      Map<String, String> segmentUriToTarPathMap)
       throws Exception {
     String tableName = spec.getTableSpec().getTableName();
-    LOGGER.info("Start pushing segment metadata: {} to locations: {} for table 
{}",
-        segmentUriToTarPathMap,
+    LOGGER.info("Start pushing segment metadata: {} to locations: {} for table 
{}", segmentUriToTarPathMap,
         Arrays.toString(spec.getPinotClusterSpecs()), tableName);
     for (String segmentUriPath : segmentUriToTarPathMap.keySet()) {
       String tarFilePath = segmentUriToTarPathMap.get(segmentUriPath);
@@ -233,15 +246,17 @@ public class SegmentPushUtils implements Serializable {
           }
           RetryPolicies.exponentialBackoffRetryPolicy(attempts, retryWaitMs, 
5).attempt(() -> {
             try {
-              List<Header> headers = ImmutableList.of(
-                  new 
BasicHeader(FileUploadDownloadClient.CustomHeaders.DOWNLOAD_URI, 
segmentUriPath),
-                  new 
BasicHeader(FileUploadDownloadClient.CustomHeaders.UPLOAD_TYPE, 
FileUploadDownloadClient.FileUploadType.METADATA.toString()));
+              List<Header> headers = ImmutableList
+                  .of(new 
BasicHeader(FileUploadDownloadClient.CustomHeaders.DOWNLOAD_URI, 
segmentUriPath),
+                      new 
BasicHeader(FileUploadDownloadClient.CustomHeaders.UPLOAD_TYPE,
+                          
FileUploadDownloadClient.FileUploadType.METADATA.toString()));
               // Add table name as a request parameter
               NameValuePair tableNameValuePair =
                   new 
BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_NAME, 
tableName);
               List<NameValuePair> parameters = 
Arrays.asList(tableNameValuePair);
-              SimpleHttpResponse response = 
FILE_UPLOAD_DOWNLOAD_CLIENT.uploadSegmentMetadata(FileUploadDownloadClient.getUploadSegmentURI(controllerURI),
-                  segmentName, segmentMetadataFile, headers, parameters, 
FILE_UPLOAD_DOWNLOAD_CLIENT.DEFAULT_SOCKET_TIMEOUT_MS);
+              SimpleHttpResponse response = FILE_UPLOAD_DOWNLOAD_CLIENT
+                  
.uploadSegmentMetadata(FileUploadDownloadClient.getUploadSegmentURI(controllerURI),
 segmentName,
+                      segmentMetadataFile, headers, parameters, 
FILE_UPLOAD_DOWNLOAD_CLIENT.DEFAULT_SOCKET_TIMEOUT_MS);
               LOGGER.info("Response for pushing table {} segment {} to 
location {} - {}: {}", tableName, segmentName,
                   controllerURI, response.getStatusCode(), 
response.getResponse());
               return true;
@@ -268,12 +283,13 @@ public class SegmentPushUtils implements Serializable {
     }
   }
 
-  public static Map<String, String> getSegmentUriToTarPathMap(URI 
outputDirURI, String uriPrefix, String uriSuffix, String[] files) {
+  public static Map<String, String> getSegmentUriToTarPathMap(URI 
outputDirURI, String uriPrefix, String uriSuffix,
+      String[] files) {
     Map<String, String> segmentUriToTarPathMap = new HashMap<>();
     for (String file : files) {
       URI uri = URI.create(file);
       if (uri.getPath().endsWith(Constants.TAR_GZ_FILE_EXT)) {
-        URI updatedURI = SegmentPushUtils.generateSegmentTarURI(outputDirURI, 
uri, uriPrefix,uriSuffix);
+        URI updatedURI = SegmentPushUtils.generateSegmentTarURI(outputDirURI, 
uri, uriPrefix, uriSuffix);
         segmentUriToTarPathMap.put(updatedURI.toString(), file);
       }
     }
@@ -293,7 +309,8 @@ public class SegmentPushUtils implements Serializable {
   private static File generateSegmentMetadataFile(PinotFS fileSystem, URI 
tarFileURI)
       throws Exception {
     String uuid = UUID.randomUUID().toString();
-    File tarFile = new File(FileUtils.getTempDirectory(), "segmentTar-" + uuid 
+ TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
+    File tarFile =
+        new File(FileUtils.getTempDirectory(), "segmentTar-" + uuid + 
TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
     File segmentMetadataDir = new File(FileUtils.getTempDirectory(), 
"segmentMetadataDir-" + uuid);
     try {
       fileSystem.copyToLocalFile(tarFileURI, tarFile);
@@ -312,7 +329,8 @@ public class SegmentPushUtils implements Serializable {
       TarGzCompressionUtils.untarOneFile(tarFile, 
V1Constants.SEGMENT_CREATION_META,
           new File(segmentMetadataDir, V1Constants.SEGMENT_CREATION_META));
 
-      File segmentMetadataTarFile = new File(FileUtils.getTempDirectory(), 
"segmentMetadata-" + uuid + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
+      File segmentMetadataTarFile = new File(FileUtils.getTempDirectory(),
+          "segmentMetadata-" + uuid + 
TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
       if (segmentMetadataTarFile.exists()) {
         FileUtils.forceDelete(segmentMetadataTarFile);
       }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/SegmentGenerationJobSpec.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/SegmentGenerationJobSpec.java
index ab199bc..740beaf 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/SegmentGenerationJobSpec.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/SegmentGenerationJobSpec.java
@@ -111,6 +111,11 @@ public class SegmentGenerationJobSpec implements 
Serializable {
    */
   private PushJobSpec _pushJobSpec;
 
+  /**
+   * Should clean up output segment on job completion.
+   */
+  private boolean _cleanUpOutputDir;
+
   public ExecutionFrameworkSpec getExecutionFrameworkSpec() {
     return _executionFrameworkSpec;
   }
@@ -247,6 +252,14 @@ public class SegmentGenerationJobSpec implements 
Serializable {
   public void setSegmentCreationJobParallelism(int 
segmentCreationJobParallelism) {
     _segmentCreationJobParallelism = segmentCreationJobParallelism;
   }
+
+  public void setCleanUpOutputDir(boolean cleanUpOutputDir) {
+    _cleanUpOutputDir = cleanUpOutputDir;
+  }
+
+  public boolean isCleanUpOutputDir() {
+    return _cleanUpOutputDir;
+  }
 }
 
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to