This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new b58514c799 Create metadata only tarball for metadata push job (#10034)
b58514c799 is described below

commit b58514c7995b0489e560a6bfd76ec2eaa0300bbf
Author: Xiang Fu <xiangfu.1...@gmail.com>
AuthorDate: Wed Jan 4 23:29:38 2023 -0800

    Create metadata only tarball for metadata push job (#10034)
---
 .../batch/common/SegmentGenerationJobUtils.java    | 46 ++++++++++++++++++++++
 .../batch/hadoop/HadoopSegmentCreationMapper.java  | 24 ++++++++---
 .../spark/SparkSegmentGenerationJobRunner.java     | 34 ++++++++++------
 .../spark3/SparkSegmentGenerationJobRunner.java    | 32 +++++++++------
 .../standalone/SegmentGenerationJobRunner.java     |  1 +
 .../segment/local/utils/SegmentPushUtils.java      | 21 +++++++++-
 .../pinot/spi/ingestion/batch/spec/Constants.java  |  1 +
 .../spi/ingestion/batch/spec/PushJobSpec.java      | 14 +++++++
 .../batch/spec/SegmentGenerationJobSpec.java       | 13 ++++++
 .../batch/spec/SegmentGenerationTaskSpec.java      | 10 +++++
 10 files changed, 166 insertions(+), 30 deletions(-)

diff --git 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationJobUtils.java
 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationJobUtils.java
index 1d192f01ff..269f2f39e2 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationJobUtils.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationJobUtils.java
@@ -18,8 +18,23 @@
  */
 package org.apache.pinot.plugin.ingestion.batch.common;
 
+import java.io.File;
 import java.io.Serializable;
+import java.net.URI;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.SimpleFileVisitor;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.utils.TarGzCompressionUtils;
+import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.spi.filesystem.PinotFS;
+import org.apache.pinot.spi.filesystem.PinotFSFactory;
 import org.apache.pinot.spi.ingestion.batch.spec.SegmentNameGeneratorSpec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 @SuppressWarnings("serial")
@@ -27,6 +42,8 @@ public class SegmentGenerationJobUtils implements 
Serializable {
   private SegmentGenerationJobUtils() {
   }
 
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(SegmentGenerationJobUtils.class);
+
   /**
    * Always use local directory sequence id unless explicitly config: 
"use.global.directory.sequence.id".
    *
@@ -46,4 +63,33 @@ public class SegmentGenerationJobUtils implements 
Serializable {
     }
     return Boolean.parseBoolean(useGlobalDirectorySequenceId);
   }
+
+  public static void createSegmentMetadataTarGz(File localSegmentDir, File 
localMetadataTarFile)
+      throws Exception {
+    List<File> metadataFiles = new ArrayList<>();
+    Files.walkFileTree(localSegmentDir.toPath(), new SimpleFileVisitor<Path>() 
{
+      @Override
+      public FileVisitResult visitFile(Path file, 
java.nio.file.attribute.BasicFileAttributes attrs) {
+        if 
(file.getFileName().toString().equals(V1Constants.MetadataKeys.METADATA_FILE_NAME)
+            || 
file.getFileName().toString().equals(V1Constants.SEGMENT_CREATION_META)) {
+          metadataFiles.add(file.toFile());
+        }
+        return FileVisitResult.CONTINUE;
+      }
+    });
+    LOGGER.info("Tarring metadata files from: [{}] to: {}", metadataFiles, 
localMetadataTarFile);
+    TarGzCompressionUtils.createTarGzFile(metadataFiles.toArray(new File[0]), 
localMetadataTarFile);
+  }
+
+  public static void moveLocalTarFileToRemote(File localMetadataTarFile, URI 
outputMetadataTarURI, boolean overwrite)
+      throws Exception {
+    LOGGER.info("Trying to move metadata tar file from: [{}] to [{}]", 
localMetadataTarFile, outputMetadataTarURI);
+    PinotFS outputPinotFS = 
PinotFSFactory.create(outputMetadataTarURI.getScheme());
+    if (!overwrite && outputPinotFS.exists(outputMetadataTarURI)) {
+      LOGGER.warn("Not overwrite existing output metadata tar file: {}", 
outputPinotFS.exists(outputMetadataTarURI));
+    } else {
+      outputPinotFS.copyFromLocalFile(localMetadataTarFile, 
outputMetadataTarURI);
+    }
+    FileUtils.deleteQuietly(localMetadataTarFile);
+  }
 }
diff --git 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentCreationMapper.java
 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentCreationMapper.java
index c503e42f63..3fb658faae 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentCreationMapper.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentCreationMapper.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.pinot.common.segment.generation.SegmentGenerationUtils;
 import org.apache.pinot.common.utils.TarGzCompressionUtils;
+import 
org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationJobUtils;
 import 
org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationTaskRunner;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.filesystem.PinotFS;
@@ -183,13 +184,26 @@ public class HadoopSegmentCreationMapper extends 
Mapper<LongWritable, Text, Long
       LOGGER.info("Size for segment: {}, uncompressed: {}, compressed: {}", 
segmentName,
           DataSizeUtils.fromBytes(uncompressedSegmentSize), 
DataSizeUtils.fromBytes(compressedSegmentSize));
       //move segment to output PinotFS
-      URI outputSegmentTarURI = 
SegmentGenerationUtils.getRelativeOutputPath(inputDirURI, inputFileURI, 
outputDirURI)
-          .resolve(segmentTarFileName);
-      LOGGER.info("Copying segment tar file from [{}] to [{}]", 
localSegmentTarFile, outputSegmentTarURI);
-      outputDirFS.copyFromLocalFile(localSegmentTarFile, outputSegmentTarURI);
+      URI relativeOutputPath = 
SegmentGenerationUtils.getRelativeOutputPath(inputDirURI, inputFileURI, 
outputDirURI);
+      URI outputSegmentTarURI = relativeOutputPath.resolve(segmentTarFileName);
+      SegmentGenerationJobUtils.moveLocalTarFileToRemote(localSegmentTarFile, 
outputSegmentTarURI,
+          _spec.isOverwriteOutput());
+
+      // Create and upload segment metadata tar file
+      String metadataTarFileName = URLEncoder.encode(segmentName + 
Constants.METADATA_TAR_GZ_FILE_EXT, "UTF-8");
+      URI outputMetadataTarURI = 
relativeOutputPath.resolve(metadataTarFileName);
+      if (outputDirFS.exists(outputMetadataTarURI) && 
(_spec.isOverwriteOutput() || !_spec.isCreateMetadataTarGz())) {
+        LOGGER.info("Deleting existing metadata tar gz file: {}", 
outputMetadataTarURI);
+        outputDirFS.delete(outputMetadataTarURI, true);
+      }
 
+      if (taskSpec.isCreateMetadataTarGz()) {
+        File localMetadataTarFile = new File(localOutputTempDir, 
metadataTarFileName);
+        SegmentGenerationJobUtils.createSegmentMetadataTarGz(localSegmentDir, 
localMetadataTarFile);
+        
SegmentGenerationJobUtils.moveLocalTarFileToRemote(localMetadataTarFile, 
outputMetadataTarURI,
+            _spec.isOverwriteOutput());
+      }
       FileUtils.deleteQuietly(localSegmentDir);
-      FileUtils.deleteQuietly(localSegmentTarFile);
       FileUtils.deleteQuietly(localInputDataFile);
 
       context.write(new LongWritable(idx), new Text(segmentTarFileName));
diff --git 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java
 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java
index ceaf2b1b9c..204884ab8d 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java
@@ -275,6 +275,7 @@ public class SparkSegmentGenerationJobRunner implements 
IngestionJobRunner, Seri
           taskSpec.setSequenceId(idx);
           
taskSpec.setSegmentNameGeneratorSpec(_spec.getSegmentNameGeneratorSpec());
           taskSpec.setFailOnEmptySegment(_spec.isFailOnEmptySegment());
+          taskSpec.setCreateMetadataTarGz(_spec.isCreateMetadataTarGz());
           
taskSpec.setCustomProperty(BatchConfigProperties.INPUT_DATA_FILE_URI_KEY, 
inputFileURI.toString());
 
           SegmentGenerationTaskRunner taskRunner = new 
SegmentGenerationTaskRunner(taskSpec);
@@ -290,20 +291,29 @@ public class SparkSegmentGenerationJobRunner implements 
IngestionJobRunner, Seri
           long compressedSegmentSize = FileUtils.sizeOf(localSegmentTarFile);
           LOGGER.info("Size for segment: {}, uncompressed: {}, compressed: 
{}", segmentName,
               DataSizeUtils.fromBytes(uncompressedSegmentSize), 
DataSizeUtils.fromBytes(compressedSegmentSize));
-          //move segment to output PinotFS
-          URI outputSegmentTarURI =
-              SegmentGenerationUtils.getRelativeOutputPath(finalInputDirURI, 
inputFileURI, finalOutputDirURI)
-                  .resolve(segmentTarFileName);
-          LOGGER.info("Trying to move segment tar file from: [{}] to [{}]", 
localSegmentTarFile, outputSegmentTarURI);
-          if (!_spec.isOverwriteOutput() && 
PinotFSFactory.create(outputSegmentTarURI.getScheme())
-              .exists(outputSegmentTarURI)) {
-            LOGGER.warn("Not overwrite existing output segment tar file: {}",
-                finalOutputDirFS.exists(outputSegmentTarURI));
-          } else {
-            finalOutputDirFS.copyFromLocalFile(localSegmentTarFile, 
outputSegmentTarURI);
+          // Move segment to output PinotFS
+          URI relativeOutputPath =
+              SegmentGenerationUtils.getRelativeOutputPath(finalInputDirURI, 
inputFileURI, finalOutputDirURI);
+          URI outputSegmentTarURI = 
relativeOutputPath.resolve(segmentTarFileName);
+          
SegmentGenerationJobUtils.moveLocalTarFileToRemote(localSegmentTarFile, 
outputSegmentTarURI,
+              _spec.isOverwriteOutput());
+
+          // Create and upload segment metadata tar file
+          String metadataTarFileName = URLEncoder.encode(segmentName + 
Constants.METADATA_TAR_GZ_FILE_EXT, "UTF-8");
+          URI outputMetadataTarURI = 
relativeOutputPath.resolve(metadataTarFileName);
+
+          if (finalOutputDirFS.exists(outputMetadataTarURI) && 
(_spec.isOverwriteOutput()
+              || !_spec.isCreateMetadataTarGz())) {
+            LOGGER.info("Deleting existing metadata tar gz file: {}", 
outputMetadataTarURI);
+            finalOutputDirFS.delete(outputMetadataTarURI, true);
+          }
+          if (taskSpec.isCreateMetadataTarGz()) {
+            File localMetadataTarFile = new File(localOutputTempDir, 
metadataTarFileName);
+            
SegmentGenerationJobUtils.createSegmentMetadataTarGz(localSegmentDir, 
localMetadataTarFile);
+            
SegmentGenerationJobUtils.moveLocalTarFileToRemote(localMetadataTarFile, 
outputMetadataTarURI,
+                _spec.isOverwriteOutput());
           }
           FileUtils.deleteQuietly(localSegmentDir);
-          FileUtils.deleteQuietly(localSegmentTarFile);
           FileUtils.deleteQuietly(localInputDataFile);
         }
       });
diff --git 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-3.2/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark3/SparkSegmentGenerationJobRunner.java
 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-3.2/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark3/SparkSegmentGenerationJobRunner.java
index 46db5818a2..d595da66b5 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-3.2/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark3/SparkSegmentGenerationJobRunner.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-3.2/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark3/SparkSegmentGenerationJobRunner.java
@@ -289,20 +289,28 @@ public class SparkSegmentGenerationJobRunner implements 
IngestionJobRunner, Seri
           long compressedSegmentSize = FileUtils.sizeOf(localSegmentTarFile);
           LOGGER.info("Size for segment: {}, uncompressed: {}, compressed: 
{}", segmentName,
               DataSizeUtils.fromBytes(uncompressedSegmentSize), 
DataSizeUtils.fromBytes(compressedSegmentSize));
-          //move segment to output PinotFS
-          URI outputSegmentTarURI =
-              SegmentGenerationUtils.getRelativeOutputPath(finalInputDirURI, 
inputFileURI, finalOutputDirURI)
-                  .resolve(segmentTarFileName);
-          LOGGER.info("Trying to move segment tar file from: [{}] to [{}]", 
localSegmentTarFile, outputSegmentTarURI);
-          if (!_spec.isOverwriteOutput() && 
PinotFSFactory.create(outputSegmentTarURI.getScheme())
-              .exists(outputSegmentTarURI)) {
-            LOGGER.warn("Not overwrite existing output segment tar file: {}",
-                finalOutputDirFS.exists(outputSegmentTarURI));
-          } else {
-            finalOutputDirFS.copyFromLocalFile(localSegmentTarFile, 
outputSegmentTarURI);
+          // Move segment to output PinotFS
+          URI relativeOutputPath =
+              SegmentGenerationUtils.getRelativeOutputPath(finalInputDirURI, 
inputFileURI, finalOutputDirURI);
+          URI outputSegmentTarURI = 
relativeOutputPath.resolve(segmentTarFileName);
+          
SegmentGenerationJobUtils.moveLocalTarFileToRemote(localSegmentTarFile, 
outputSegmentTarURI,
+              _spec.isOverwriteOutput());
+
+          // Create and upload segment metadata tar file
+          String metadataTarFileName = URLEncoder.encode(segmentName + 
Constants.METADATA_TAR_GZ_FILE_EXT, "UTF-8");
+          URI outputMetadataTarURI = 
relativeOutputPath.resolve(metadataTarFileName);
+          if (finalOutputDirFS.exists(outputMetadataTarURI) && 
(_spec.isOverwriteOutput()
+              || !_spec.isCreateMetadataTarGz())) {
+            LOGGER.info("Deleting existing metadata tar gz file: {}", 
outputMetadataTarURI);
+            finalOutputDirFS.delete(outputMetadataTarURI, true);
+          }
+          if (taskSpec.isCreateMetadataTarGz()) {
+            File localMetadataTarFile = new File(localOutputTempDir, 
metadataTarFileName);
+            
SegmentGenerationJobUtils.createSegmentMetadataTarGz(localSegmentDir, 
localMetadataTarFile);
+            
SegmentGenerationJobUtils.moveLocalTarFileToRemote(localMetadataTarFile, 
outputMetadataTarURI,
+                _spec.isOverwriteOutput());
           }
           FileUtils.deleteQuietly(localSegmentDir);
-          FileUtils.deleteQuietly(localSegmentTarFile);
           FileUtils.deleteQuietly(localInputDataFile);
         }
       });
diff --git 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java
 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java
index dfea6efb68..9743166077 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java
@@ -244,6 +244,7 @@ public class SegmentGenerationJobRunner implements 
IngestionJobRunner {
     taskSpec.setInputFilePath(localInputDataFile.getAbsolutePath());
     taskSpec.setSequenceId(seqId);
     taskSpec.setFailOnEmptySegment(_spec.isFailOnEmptySegment());
+    taskSpec.setCreateMetadataTarGz(_spec.isCreateMetadataTarGz());
     taskSpec.setCustomProperty(BatchConfigProperties.INPUT_DATA_FILE_URI_KEY, 
inputFileURI.toString());
 
     // If there's already been a failure, log and skip this file. Do this 
check right before the
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPushUtils.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPushUtils.java
index 92832cf889..207227a0be 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPushUtils.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPushUtils.java
@@ -285,7 +285,22 @@ public class SegmentPushUtils implements Serializable {
       String segmentName = fileName.endsWith(Constants.TAR_GZ_FILE_EXT)
           ? fileName.substring(0, fileName.length() - 
Constants.TAR_GZ_FILE_EXT.length()) : fileName;
       SegmentNameUtils.validatePartialOrFullSegmentName(segmentName);
-      File segmentMetadataFile = generateSegmentMetadataFile(fileSystem, 
URI.create(tarFilePath));
+      File segmentMetadataFile;
+      // Check if there is a segment metadata tar gz file named 
`segmentName.metadata.tar.gz`, already in the remote
+      // directory. This is to avoid generating a new segment metadata tar gz 
file every time we push a segment,
+      // which requires downloading the entire segment tar gz file.
+      URI metadataTarGzFilePath = URI.create(
+          new File(tarFilePath).getParentFile() + File.separator + segmentName 
+ Constants.METADATA_TAR_GZ_FILE_EXT);
+      if (spec.getPushJobSpec().isPreferMetadataTarGz() && 
fileSystem.exists(metadataTarGzFilePath)) {
+        segmentMetadataFile = new File(FileUtils.getTempDirectory(),
+            "segmentMetadata-" + UUID.randomUUID() + 
TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
+        if (segmentMetadataFile.exists()) {
+          FileUtils.forceDelete(segmentMetadataFile);
+        }
+        fileSystem.copyToLocalFile(metadataTarGzFilePath, segmentMetadataFile);
+      } else {
+        segmentMetadataFile = generateSegmentMetadataFile(fileSystem, 
URI.create(tarFilePath));
+      }
       try {
         for (PinotClusterSpec pinotClusterSpec : spec.getPinotClusterSpecs()) {
           URI controllerURI;
@@ -357,6 +372,10 @@ public class SegmentPushUtils implements Serializable {
       }
 
       URI uri = URI.create(file);
+      if (uri.getPath().endsWith(Constants.METADATA_TAR_GZ_FILE_EXT)) {
+        // Skip segment metadata tar gz files
+        continue;
+      }
       if (uri.getPath().endsWith(Constants.TAR_GZ_FILE_EXT)) {
         URI updatedURI = SegmentPushUtils.generateSegmentTarURI(outputDirURI, 
uri, pushSpec.getSegmentUriPrefix(),
             pushSpec.getSegmentUriSuffix());
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/Constants.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/Constants.java
index 168de41552..053be04cd5 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/Constants.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/Constants.java
@@ -26,4 +26,5 @@ public class Constants {
    * By default Pinot segments are compressed in 'tar.gz' format then pushed 
to controller.
    */
   public static final String TAR_GZ_FILE_EXT = ".tar.gz";
+  public static final String METADATA_TAR_GZ_FILE_EXT = ".metadata.tar.gz";
 }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/PushJobSpec.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/PushJobSpec.java
index 04481baf1f..31d1ce8448 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/PushJobSpec.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/PushJobSpec.java
@@ -61,6 +61,20 @@ public class PushJobSpec implements Serializable {
    */
   private String _pushFileNamePattern;
 
+  /**
+   * Prefer using segment metadata tar gz file to push segment if exists.
+   */
+  private boolean _preferMetadataTarGz = true;
+
+  public boolean isPreferMetadataTarGz() {
+    return _preferMetadataTarGz;
+  }
+
+  public PushJobSpec setPreferMetadataTarGz(boolean preferMetadataTarGz) {
+    _preferMetadataTarGz = preferMetadataTarGz;
+    return this;
+  }
+
   public String getPushFileNamePattern() {
     return _pushFileNamePattern;
   }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/SegmentGenerationJobSpec.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/SegmentGenerationJobSpec.java
index 02e52b26c1..291fd0c422 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/SegmentGenerationJobSpec.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/SegmentGenerationJobSpec.java
@@ -139,6 +139,11 @@ public class SegmentGenerationJobSpec implements 
Serializable {
    */
   private String _authToken;
 
+  /**
+   * Create a separated metadata only tar gz file to reduce the data transfer 
of segment metadata push job.
+   */
+  private boolean _createMetadataTarGz;
+
   public ExecutionFrameworkSpec getExecutionFrameworkSpec() {
     return _executionFrameworkSpec;
   }
@@ -311,6 +316,14 @@ public class SegmentGenerationJobSpec implements 
Serializable {
     _authToken = authToken;
   }
 
+  public boolean isCreateMetadataTarGz() {
+    return _createMetadataTarGz;
+  }
+
+  public void setCreateMetadataTarGz(boolean createMetadataTarGz) {
+    _createMetadataTarGz = createMetadataTarGz;
+  }
+
   public String toJSONString(boolean removeSensitiveKeys) {
     ObjectNode jsonNode = (ObjectNode) JsonUtils.objectToJsonNode(this);
     if (removeSensitiveKeys) {
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/SegmentGenerationTaskSpec.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/SegmentGenerationTaskSpec.java
index 7705cd00df..f069833f39 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/SegmentGenerationTaskSpec.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/SegmentGenerationTaskSpec.java
@@ -70,6 +70,8 @@ public class SegmentGenerationTaskSpec implements 
Serializable {
 
   private boolean _failOnEmptySegment = false;
 
+  private boolean _createMetadataTarGz = false;
+
   /**
    * Custom properties set into segment metadata
    */
@@ -139,6 +141,14 @@ public class SegmentGenerationTaskSpec implements 
Serializable {
     _failOnEmptySegment = failOnEmptySegment;
   }
 
+  public boolean isCreateMetadataTarGz() {
+    return _createMetadataTarGz;
+  }
+
+  public void setCreateMetadataTarGz(boolean createMetadataTarGz) {
+    _createMetadataTarGz = createMetadataTarGz;
+  }
+
   public void setCustomProperty(String key, String value) {
     if (!key.startsWith(CUSTOM_PREFIX)) {
       key = CUSTOM_PREFIX + key;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to