This is an automated email from the ASF dual-hosted git repository. xiangfu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new b58514c799 Create metadata only tarball for metadata push job (#10034) b58514c799 is described below commit b58514c7995b0489e560a6bfd76ec2eaa0300bbf Author: Xiang Fu <xiangfu.1...@gmail.com> AuthorDate: Wed Jan 4 23:29:38 2023 -0800 Create metadata only tarball for metadata push job (#10034) --- .../batch/common/SegmentGenerationJobUtils.java | 46 ++++++++++++++++++++++ .../batch/hadoop/HadoopSegmentCreationMapper.java | 24 ++++++++--- .../spark/SparkSegmentGenerationJobRunner.java | 34 ++++++++++------ .../spark3/SparkSegmentGenerationJobRunner.java | 32 +++++++++------ .../standalone/SegmentGenerationJobRunner.java | 1 + .../segment/local/utils/SegmentPushUtils.java | 21 +++++++++- .../pinot/spi/ingestion/batch/spec/Constants.java | 1 + .../spi/ingestion/batch/spec/PushJobSpec.java | 14 +++++++ .../batch/spec/SegmentGenerationJobSpec.java | 13 ++++++ .../batch/spec/SegmentGenerationTaskSpec.java | 10 +++++ 10 files changed, 166 insertions(+), 30 deletions(-) diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationJobUtils.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationJobUtils.java index 1d192f01ff..269f2f39e2 100644 --- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationJobUtils.java +++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationJobUtils.java @@ -18,8 +18,23 @@ */ package org.apache.pinot.plugin.ingestion.batch.common; +import java.io.File; import java.io.Serializable; +import java.net.URI; +import java.nio.file.FileVisitResult; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.SimpleFileVisitor; +import java.util.ArrayList; +import java.util.List; +import org.apache.commons.io.FileUtils; +import org.apache.pinot.common.utils.TarGzCompressionUtils; +import org.apache.pinot.segment.spi.V1Constants; +import org.apache.pinot.spi.filesystem.PinotFS; +import org.apache.pinot.spi.filesystem.PinotFSFactory; import org.apache.pinot.spi.ingestion.batch.spec.SegmentNameGeneratorSpec; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @SuppressWarnings("serial") @@ -27,6 +42,8 @@ public class SegmentGenerationJobUtils implements Serializable { private SegmentGenerationJobUtils() { } + private static final Logger LOGGER = LoggerFactory.getLogger(SegmentGenerationJobUtils.class); + /** * Always use local directory sequence id unless explicitly config: "use.global.directory.sequence.id". * @@ -46,4 +63,33 @@ public class SegmentGenerationJobUtils implements Serializable { } return Boolean.parseBoolean(useGlobalDirectorySequenceId); } + + public static void createSegmentMetadataTarGz(File localSegmentDir, File localMetadataTarFile) + throws Exception { + List<File> metadataFiles = new ArrayList<>(); + Files.walkFileTree(localSegmentDir.toPath(), new SimpleFileVisitor<Path>() { + @Override + public FileVisitResult visitFile(Path file, java.nio.file.attribute.BasicFileAttributes attrs) { + if (file.getFileName().toString().equals(V1Constants.MetadataKeys.METADATA_FILE_NAME) + || file.getFileName().toString().equals(V1Constants.SEGMENT_CREATION_META)) { + metadataFiles.add(file.toFile()); + } + return FileVisitResult.CONTINUE; + } + }); + LOGGER.info("Tarring metadata files from: [{}] to: {}", metadataFiles, localMetadataTarFile); + TarGzCompressionUtils.createTarGzFile(metadataFiles.toArray(new File[0]), localMetadataTarFile); + } + + public static void moveLocalTarFileToRemote(File localMetadataTarFile, URI outputMetadataTarURI, boolean overwrite) + throws Exception { + LOGGER.info("Trying to move metadata tar file from: [{}] to [{}]", localMetadataTarFile, outputMetadataTarURI); + PinotFS outputPinotFS = PinotFSFactory.create(outputMetadataTarURI.getScheme()); + if (!overwrite && outputPinotFS.exists(outputMetadataTarURI)) { + LOGGER.warn("Not overwrite existing output metadata tar file: {}", outputPinotFS.exists(outputMetadataTarURI)); + } else { + outputPinotFS.copyFromLocalFile(localMetadataTarFile, outputMetadataTarURI); + } + FileUtils.deleteQuietly(localMetadataTarFile); + } } diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentCreationMapper.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentCreationMapper.java index c503e42f63..3fb658faae 100644 --- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentCreationMapper.java +++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentCreationMapper.java @@ -34,6 +34,7 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.pinot.common.segment.generation.SegmentGenerationUtils; import org.apache.pinot.common.utils.TarGzCompressionUtils; +import org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationJobUtils; import org.apache.pinot.plugin.ingestion.batch.common.SegmentGenerationTaskRunner; import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.filesystem.PinotFS; @@ -183,13 +184,26 @@ public class HadoopSegmentCreationMapper extends Mapper<LongWritable, Text, Long LOGGER.info("Size for segment: {}, uncompressed: {}, compressed: {}", segmentName, DataSizeUtils.fromBytes(uncompressedSegmentSize), DataSizeUtils.fromBytes(compressedSegmentSize)); //move segment to output PinotFS - URI outputSegmentTarURI = SegmentGenerationUtils.getRelativeOutputPath(inputDirURI, inputFileURI, outputDirURI) - .resolve(segmentTarFileName); - LOGGER.info("Copying segment tar file from [{}] to [{}]", localSegmentTarFile, outputSegmentTarURI); - outputDirFS.copyFromLocalFile(localSegmentTarFile, outputSegmentTarURI); + URI relativeOutputPath = SegmentGenerationUtils.getRelativeOutputPath(inputDirURI, inputFileURI, outputDirURI); + URI outputSegmentTarURI = relativeOutputPath.resolve(segmentTarFileName); + SegmentGenerationJobUtils.moveLocalTarFileToRemote(localSegmentTarFile, outputSegmentTarURI, + _spec.isOverwriteOutput()); + + // Create and upload segment metadata tar file + String metadataTarFileName = URLEncoder.encode(segmentName + Constants.METADATA_TAR_GZ_FILE_EXT, "UTF-8"); + URI outputMetadataTarURI = relativeOutputPath.resolve(metadataTarFileName); + if (outputDirFS.exists(outputMetadataTarURI) && (_spec.isOverwriteOutput() || !_spec.isCreateMetadataTarGz())) { + LOGGER.info("Deleting existing metadata tar gz file: {}", outputMetadataTarURI); + outputDirFS.delete(outputMetadataTarURI, true); + } + if (taskSpec.isCreateMetadataTarGz()) { + File localMetadataTarFile = new File(localOutputTempDir, metadataTarFileName); + SegmentGenerationJobUtils.createSegmentMetadataTarGz(localSegmentDir, localMetadataTarFile); + SegmentGenerationJobUtils.moveLocalTarFileToRemote(localMetadataTarFile, outputMetadataTarURI, + _spec.isOverwriteOutput()); + } FileUtils.deleteQuietly(localSegmentDir); - FileUtils.deleteQuietly(localSegmentTarFile); FileUtils.deleteQuietly(localInputDataFile); context.write(new LongWritable(idx), new Text(segmentTarFileName)); diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java index ceaf2b1b9c..204884ab8d 100644 --- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java +++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java @@ -275,6 +275,7 @@ public class SparkSegmentGenerationJobRunner implements IngestionJobRunner, Seri taskSpec.setSequenceId(idx); taskSpec.setSegmentNameGeneratorSpec(_spec.getSegmentNameGeneratorSpec()); taskSpec.setFailOnEmptySegment(_spec.isFailOnEmptySegment()); + taskSpec.setCreateMetadataTarGz(_spec.isCreateMetadataTarGz()); taskSpec.setCustomProperty(BatchConfigProperties.INPUT_DATA_FILE_URI_KEY, inputFileURI.toString()); SegmentGenerationTaskRunner taskRunner = new SegmentGenerationTaskRunner(taskSpec); @@ -290,20 +291,29 @@ public class SparkSegmentGenerationJobRunner implements IngestionJobRunner, Seri long compressedSegmentSize = FileUtils.sizeOf(localSegmentTarFile); LOGGER.info("Size for segment: {}, uncompressed: {}, compressed: {}", segmentName, DataSizeUtils.fromBytes(uncompressedSegmentSize), DataSizeUtils.fromBytes(compressedSegmentSize)); - //move segment to output PinotFS - URI outputSegmentTarURI = - SegmentGenerationUtils.getRelativeOutputPath(finalInputDirURI, inputFileURI, finalOutputDirURI) - .resolve(segmentTarFileName); - LOGGER.info("Trying to move segment tar file from: [{}] to [{}]", localSegmentTarFile, outputSegmentTarURI); - if (!_spec.isOverwriteOutput() && PinotFSFactory.create(outputSegmentTarURI.getScheme()) - .exists(outputSegmentTarURI)) { - LOGGER.warn("Not overwrite existing output segment tar file: {}", - finalOutputDirFS.exists(outputSegmentTarURI)); - } else { - finalOutputDirFS.copyFromLocalFile(localSegmentTarFile, outputSegmentTarURI); + // Move segment to output PinotFS + URI relativeOutputPath = + SegmentGenerationUtils.getRelativeOutputPath(finalInputDirURI, inputFileURI, finalOutputDirURI); + URI outputSegmentTarURI = relativeOutputPath.resolve(segmentTarFileName); + SegmentGenerationJobUtils.moveLocalTarFileToRemote(localSegmentTarFile, outputSegmentTarURI, + _spec.isOverwriteOutput()); + + // Create and upload segment metadata tar file + String metadataTarFileName = URLEncoder.encode(segmentName + Constants.METADATA_TAR_GZ_FILE_EXT, "UTF-8"); + URI outputMetadataTarURI = relativeOutputPath.resolve(metadataTarFileName); + + if (finalOutputDirFS.exists(outputMetadataTarURI) && (_spec.isOverwriteOutput() + || !_spec.isCreateMetadataTarGz())) { + LOGGER.info("Deleting existing metadata tar gz file: {}", outputMetadataTarURI); + finalOutputDirFS.delete(outputMetadataTarURI, true); + } + if (taskSpec.isCreateMetadataTarGz()) { + File localMetadataTarFile = new File(localOutputTempDir, metadataTarFileName); + SegmentGenerationJobUtils.createSegmentMetadataTarGz(localSegmentDir, localMetadataTarFile); + SegmentGenerationJobUtils.moveLocalTarFileToRemote(localMetadataTarFile, outputMetadataTarURI, + _spec.isOverwriteOutput()); } FileUtils.deleteQuietly(localSegmentDir); - FileUtils.deleteQuietly(localSegmentTarFile); FileUtils.deleteQuietly(localInputDataFile); } }); diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-3.2/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark3/SparkSegmentGenerationJobRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-3.2/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark3/SparkSegmentGenerationJobRunner.java index 46db5818a2..d595da66b5 100644 --- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-3.2/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark3/SparkSegmentGenerationJobRunner.java +++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-3.2/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark3/SparkSegmentGenerationJobRunner.java @@ -289,20 +289,28 @@ public class SparkSegmentGenerationJobRunner implements IngestionJobRunner, Seri long compressedSegmentSize = FileUtils.sizeOf(localSegmentTarFile); LOGGER.info("Size for segment: {}, uncompressed: {}, compressed: {}", segmentName, DataSizeUtils.fromBytes(uncompressedSegmentSize), DataSizeUtils.fromBytes(compressedSegmentSize)); - //move segment to output PinotFS - URI outputSegmentTarURI = - SegmentGenerationUtils.getRelativeOutputPath(finalInputDirURI, inputFileURI, finalOutputDirURI) - .resolve(segmentTarFileName); - LOGGER.info("Trying to move segment tar file from: [{}] to [{}]", localSegmentTarFile, outputSegmentTarURI); - if (!_spec.isOverwriteOutput() && PinotFSFactory.create(outputSegmentTarURI.getScheme()) - .exists(outputSegmentTarURI)) { - LOGGER.warn("Not overwrite existing output segment tar file: {}", - finalOutputDirFS.exists(outputSegmentTarURI)); - } else { - finalOutputDirFS.copyFromLocalFile(localSegmentTarFile, outputSegmentTarURI); + // Move segment to output PinotFS + URI relativeOutputPath = + SegmentGenerationUtils.getRelativeOutputPath(finalInputDirURI, inputFileURI, finalOutputDirURI); + URI outputSegmentTarURI = relativeOutputPath.resolve(segmentTarFileName); + SegmentGenerationJobUtils.moveLocalTarFileToRemote(localSegmentTarFile, outputSegmentTarURI, + _spec.isOverwriteOutput()); + + // Create and upload segment metadata tar file + String metadataTarFileName = URLEncoder.encode(segmentName + Constants.METADATA_TAR_GZ_FILE_EXT, "UTF-8"); + URI outputMetadataTarURI = relativeOutputPath.resolve(metadataTarFileName); + if (finalOutputDirFS.exists(outputMetadataTarURI) && (_spec.isOverwriteOutput() + || !_spec.isCreateMetadataTarGz())) { + LOGGER.info("Deleting existing metadata tar gz file: {}", outputMetadataTarURI); + finalOutputDirFS.delete(outputMetadataTarURI, true); + } + if (taskSpec.isCreateMetadataTarGz()) { + File localMetadataTarFile = new File(localOutputTempDir, metadataTarFileName); + SegmentGenerationJobUtils.createSegmentMetadataTarGz(localSegmentDir, localMetadataTarFile); + SegmentGenerationJobUtils.moveLocalTarFileToRemote(localMetadataTarFile, outputMetadataTarURI, + _spec.isOverwriteOutput()); } FileUtils.deleteQuietly(localSegmentDir); - FileUtils.deleteQuietly(localSegmentTarFile); FileUtils.deleteQuietly(localInputDataFile); } }); diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java index dfea6efb68..9743166077 100644 --- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java +++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java @@ -244,6 +244,7 @@ public class SegmentGenerationJobRunner implements IngestionJobRunner { taskSpec.setInputFilePath(localInputDataFile.getAbsolutePath()); taskSpec.setSequenceId(seqId); taskSpec.setFailOnEmptySegment(_spec.isFailOnEmptySegment()); + taskSpec.setCreateMetadataTarGz(_spec.isCreateMetadataTarGz()); taskSpec.setCustomProperty(BatchConfigProperties.INPUT_DATA_FILE_URI_KEY, inputFileURI.toString()); // If there's already been a failure, log and skip this file. Do this check right before the diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPushUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPushUtils.java index 92832cf889..207227a0be 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPushUtils.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPushUtils.java @@ -285,7 +285,22 @@ public class SegmentPushUtils implements Serializable { String segmentName = fileName.endsWith(Constants.TAR_GZ_FILE_EXT) ? fileName.substring(0, fileName.length() - Constants.TAR_GZ_FILE_EXT.length()) : fileName; SegmentNameUtils.validatePartialOrFullSegmentName(segmentName); - File segmentMetadataFile = generateSegmentMetadataFile(fileSystem, URI.create(tarFilePath)); + File segmentMetadataFile; + // Check if there is a segment metadata tar gz file named `segmentName.metadata.tar.gz`, already in the remote + // directory. This is to avoid generating a new segment metadata tar gz file every time we push a segment, + // which requires downloading the entire segment tar gz file. + URI metadataTarGzFilePath = URI.create( + new File(tarFilePath).getParentFile() + File.separator + segmentName + Constants.METADATA_TAR_GZ_FILE_EXT); + if (spec.getPushJobSpec().isPreferMetadataTarGz() && fileSystem.exists(metadataTarGzFilePath)) { + segmentMetadataFile = new File(FileUtils.getTempDirectory(), + "segmentMetadata-" + UUID.randomUUID() + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION); + if (segmentMetadataFile.exists()) { + FileUtils.forceDelete(segmentMetadataFile); + } + fileSystem.copyToLocalFile(metadataTarGzFilePath, segmentMetadataFile); + } else { + segmentMetadataFile = generateSegmentMetadataFile(fileSystem, URI.create(tarFilePath)); + } try { for (PinotClusterSpec pinotClusterSpec : spec.getPinotClusterSpecs()) { URI controllerURI; @@ -357,6 +372,10 @@ public class SegmentPushUtils implements Serializable { } URI uri = URI.create(file); + if (uri.getPath().endsWith(Constants.METADATA_TAR_GZ_FILE_EXT)) { + // Skip segment metadata tar gz files + continue; + } if (uri.getPath().endsWith(Constants.TAR_GZ_FILE_EXT)) { URI updatedURI = SegmentPushUtils.generateSegmentTarURI(outputDirURI, uri, pushSpec.getSegmentUriPrefix(), pushSpec.getSegmentUriSuffix()); diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/Constants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/Constants.java index 168de41552..053be04cd5 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/Constants.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/Constants.java @@ -26,4 +26,5 @@ public class Constants { * By default Pinot segments are compressed in 'tar.gz' format then pushed to controller. */ public static final String TAR_GZ_FILE_EXT = ".tar.gz"; + public static final String METADATA_TAR_GZ_FILE_EXT = ".metadata.tar.gz"; } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/PushJobSpec.java b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/PushJobSpec.java index 04481baf1f..31d1ce8448 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/PushJobSpec.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/PushJobSpec.java @@ -61,6 +61,20 @@ public class PushJobSpec implements Serializable { */ private String _pushFileNamePattern; + /** + * Prefer using segment metadata tar gz file to push segment if exists. + */ + private boolean _preferMetadataTarGz = true; + + public boolean isPreferMetadataTarGz() { + return _preferMetadataTarGz; + } + + public PushJobSpec setPreferMetadataTarGz(boolean preferMetadataTarGz) { + _preferMetadataTarGz = preferMetadataTarGz; + return this; + } + public String getPushFileNamePattern() { return _pushFileNamePattern; } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/SegmentGenerationJobSpec.java b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/SegmentGenerationJobSpec.java index 02e52b26c1..291fd0c422 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/SegmentGenerationJobSpec.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/SegmentGenerationJobSpec.java @@ -139,6 +139,11 @@ public class SegmentGenerationJobSpec implements Serializable { */ private String _authToken; + /** + * Create a separated metadata only tar gz file to reduce the data transfer of segment metadata push job. + */ + private boolean _createMetadataTarGz; + public ExecutionFrameworkSpec getExecutionFrameworkSpec() { return _executionFrameworkSpec; } @@ -311,6 +316,14 @@ public class SegmentGenerationJobSpec implements Serializable { _authToken = authToken; } + public boolean isCreateMetadataTarGz() { + return _createMetadataTarGz; + } + + public void setCreateMetadataTarGz(boolean createMetadataTarGz) { + _createMetadataTarGz = createMetadataTarGz; + } + public String toJSONString(boolean removeSensitiveKeys) { ObjectNode jsonNode = (ObjectNode) JsonUtils.objectToJsonNode(this); if (removeSensitiveKeys) { diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/SegmentGenerationTaskSpec.java b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/SegmentGenerationTaskSpec.java index 7705cd00df..f069833f39 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/SegmentGenerationTaskSpec.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/SegmentGenerationTaskSpec.java @@ -70,6 +70,8 @@ public class SegmentGenerationTaskSpec implements Serializable { private boolean _failOnEmptySegment = false; + private boolean _createMetadataTarGz = false; + /** * Custom properties set into segment metadata */ @@ -139,6 +141,14 @@ public class SegmentGenerationTaskSpec implements Serializable { _failOnEmptySegment = failOnEmptySegment; } + public boolean isCreateMetadataTarGz() { + return _createMetadataTarGz; + } + + public void setCreateMetadataTarGz(boolean createMetadataTarGz) { + _createMetadataTarGz = createMetadataTarGz; + } + public void setCustomProperty(String key, String value) { if (!key.startsWith(CUSTOM_PREFIX)) { key = CUSTOM_PREFIX + key; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org