This is an automated email from the ASF dual-hosted git repository.

lqc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 1f8fd632c0 [Feature] Enable the capability to specify zstd and lz4 
segment compression via config (#14008)
1f8fd632c0 is described below

commit 1f8fd632c09d6835f42d71675277694234938934
Author: Jack Luo <jack....@mail.utoronto.ca>
AuthorDate: Tue Oct 22 04:13:32 2024 +0800

    [Feature] Enable the capability to specify zstd and lz4 segment compression 
via config (#14008)
    
    * Enable the capability to specify zstd and lz4 segment compression codec 
via config
    
    * Reduce the scope of the change to server-only
    
    * Add a blank line to trigger unit test again
    
    * Addressed code review comments.
---
 .../pinot/common/utils/TarCompressionUtils.java    | 41 ++++++++++++++++++----
 .../core/data/manager/BaseTableDataManager.java    |  4 +--
 .../realtime/RealtimeSegmentDataManager.java       |  5 +--
 .../data/manager/BaseTableDataManagerTest.java     | 10 +++---
 .../server/starter/helix/BaseServerStarter.java    |  7 ++++
 .../apache/pinot/spi/utils/CommonConstants.java    |  2 ++
 6 files changed, 54 insertions(+), 15 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/TarCompressionUtils.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/TarCompressionUtils.java
index 089c0fae36..3a6f3170f0 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/TarCompressionUtils.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/TarCompressionUtils.java
@@ -69,6 +69,12 @@ public class TarCompressionUtils {
   private TarCompressionUtils() {
   }
 
+  /**
+   * This generic compressed tar file extension does not bind to a particular 
compressor. Decompression determines the
+   * appropriate compressor at run-time based on the file's magic number 
irrespective of the file extension.
+   * Compression uses the default compressor automatically if this generic 
extension is used.
+   */
+  public static final String TAR_COMPRESSED_FILE_EXTENSION = ".tar.compressed";
   public static final String TAR_GZ_FILE_EXTENSION = ".tar.gz";
   public static final String TAR_LZ4_FILE_EXTENSION = ".tar.lz4";
   public static final String TAR_ZST_FILE_EXTENSION = ".tar.zst";
@@ -77,6 +83,13 @@ public class TarCompressionUtils {
           CompressorStreamFactory.LZ4_FRAMED, TAR_ZST_FILE_EXTENSION, 
CompressorStreamFactory.ZSTANDARD);
   private static final CompressorStreamFactory COMPRESSOR_STREAM_FACTORY = 
CompressorStreamFactory.getSingleton();
   private static final char ENTRY_NAME_SEPARATOR = '/';
+  private static String _defaultCompressorName = CompressorStreamFactory.GZIP;
+
+  public static void setDefaultCompressor(String compressorName) {
+    if (COMPRESSOR_NAME_BY_FILE_EXTENSIONS.containsKey(compressorName)) {
+      _defaultCompressorName = compressorName;
+    }
+  }
 
   /**
    * Creates a compressed tar file from the input file/directory to the output 
file. The output file must have
@@ -93,15 +106,29 @@ public class TarCompressionUtils {
    */
   public static void createCompressedTarFile(File[] inputFiles, File 
outputFile)
       throws IOException {
-    String compressorName = null;
-    for (String supportedCompressorExtension : 
COMPRESSOR_NAME_BY_FILE_EXTENSIONS.keySet()) {
-      if (outputFile.getName().endsWith(supportedCompressorExtension)) {
-        compressorName = 
COMPRESSOR_NAME_BY_FILE_EXTENSIONS.get(supportedCompressorExtension);
-        break;
+    if (outputFile.getName().endsWith(TAR_COMPRESSED_FILE_EXTENSION)) {
+      createCompressedTarFile(inputFiles, outputFile, _defaultCompressorName);
+    } else {
+      String compressorName = null;
+      for (String supportedCompressorExtension : 
COMPRESSOR_NAME_BY_FILE_EXTENSIONS.keySet()) {
+        if (outputFile.getName().endsWith(supportedCompressorExtension)) {
+          compressorName = 
COMPRESSOR_NAME_BY_FILE_EXTENSIONS.get(supportedCompressorExtension);
+          createCompressedTarFile(inputFiles, outputFile, compressorName);
+          return;
+        }
       }
+      Preconditions.checkState(null != compressorName,
+          "Output file: %s does not have a supported compressed tar file 
extension", outputFile);
     }
-    Preconditions.checkState(null != compressorName,
-        "Output file: %s does not have a supported compressed tar file 
extension", outputFile);
+  }
+
+  public static void createCompressedTarFile(File inputFile, File outputFile, 
String compressorName)
+      throws IOException {
+    createCompressedTarFile(new File[]{inputFile}, outputFile, compressorName);
+  }
+
+  public static void createCompressedTarFile(File[] inputFiles, File 
outputFile, String compressorName)
+      throws IOException {
     try (OutputStream fileOut = Files.newOutputStream(outputFile.toPath());
         BufferedOutputStream bufferedOut = new BufferedOutputStream(fileOut);
         OutputStream compressorOut = 
COMPRESSOR_STREAM_FACTORY.createCompressorOutputStream(compressorName,
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
index 56d2cb35d6..b1d3647a54 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
@@ -793,7 +793,7 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
               failedAttempts.get());
         }
       } else {
-        File segmentTarFile = new File(tempRootDir, segmentName + 
TarCompressionUtils.TAR_GZ_FILE_EXTENSION);
+        File segmentTarFile = new File(tempRootDir, segmentName + 
TarCompressionUtils.TAR_COMPRESSED_FILE_EXTENSION);
         SegmentFetcherFactory.fetchAndDecryptSegmentToLocal(downloadUrl, 
segmentTarFile, zkMetadata.getCrypterName());
         _logger.info("Downloaded tarred segment: {} from: {} to: {}, file 
length: {}", segmentName, downloadUrl,
             segmentTarFile, segmentTarFile.length());
@@ -820,7 +820,7 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
         _tableNameWithType);
     _logger.info("Downloading segment: {} from peers", segmentName);
     File tempRootDir = getTmpSegmentDataDir("tmp-" + segmentName + "-" + 
UUID.randomUUID());
-    File segmentTarFile = new File(tempRootDir, segmentName + 
TarCompressionUtils.TAR_GZ_FILE_EXTENSION);
+    File segmentTarFile = new File(tempRootDir, segmentName + 
TarCompressionUtils.TAR_COMPRESSED_FILE_EXTENSION);
     try {
       SegmentFetcherFactory.fetchAndDecryptSegmentToLocal(segmentName, 
_peerDownloadScheme, () -> {
         List<URI> peerServerURIs =
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index bed8f2a310..939e43d393 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -1016,7 +1016,8 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
       
_serverMetrics.addValueToGlobalGauge(ServerGauge.LLC_SIMULTANEOUS_SEGMENT_BUILDS,
 1L);
 
       final long lockAcquireTimeMillis = now();
-      // Build a segment from in-memory rows.If buildTgz is true, then build 
the tar.gz file as well
+      // Build a segment from in-memory rows.
+      // If build compressed archive is true, then build the tar.compressed 
file as well
       // TODO Use an auto-closeable object to delete temp resources.
       File tempSegmentFolder = new File(_resourceTmpDir, "tmp-" + 
_segmentNameStr + "-" + now());
 
@@ -1069,7 +1070,7 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
           TimeUnit.MILLISECONDS.toSeconds(waitTimeMillis));
 
       if (forCommit) {
-        File segmentTarFile = new File(dataDir, _segmentNameStr + 
TarCompressionUtils.TAR_GZ_FILE_EXTENSION);
+        File segmentTarFile = new File(dataDir, _segmentNameStr + 
TarCompressionUtils.TAR_COMPRESSED_FILE_EXTENSION);
         try {
           TarCompressionUtils.createCompressedTarFile(indexDir, 
segmentTarFile);
         } catch (IOException e) {
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
index e40ef49131..7d351c486f 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
@@ -293,7 +293,8 @@ public class BaseTableDataManagerTest {
       throws Exception {
     File indexDir = createSegment(SegmentVersion.v3, 5);
     SegmentZKMetadata zkMetadata =
-        makeRawSegment(indexDir, new File(TEMP_DIR, SEGMENT_NAME + 
TarCompressionUtils.TAR_GZ_FILE_EXTENSION), false);
+        makeRawSegment(indexDir, new File(TEMP_DIR, SEGMENT_NAME + 
TarCompressionUtils.TAR_COMPRESSED_FILE_EXTENSION),
+            false);
 
     // Same CRC but force to download.
     BaseTableDataManager tableDataManager = createTableManager();
@@ -567,7 +568,7 @@ public class BaseTableDataManagerTest {
     File tempDir = new File(TEMP_DIR, "testDownloadAndDecrypt");
     String fileName = "tmp.txt";
     FileUtils.write(new File(tempDir, fileName), "this is from somewhere 
remote");
-    String tarFileName = SEGMENT_NAME + 
TarCompressionUtils.TAR_GZ_FILE_EXTENSION;
+    String tarFileName = SEGMENT_NAME + 
TarCompressionUtils.TAR_COMPRESSED_FILE_EXTENSION;
     File tempTarFile = new File(TEMP_DIR, tarFileName);
     TarCompressionUtils.createCompressedTarFile(tempDir, tempTarFile);
 
@@ -607,7 +608,7 @@ public class BaseTableDataManagerTest {
     File tempRootDir = 
tableDataManager.getTmpSegmentDataDir("test-untar-move");
 
     // All input and intermediate files are put in the tempRootDir.
-    File tempTar = new File(tempRootDir, SEGMENT_NAME + 
TarCompressionUtils.TAR_GZ_FILE_EXTENSION);
+    File tempTar = new File(tempRootDir, SEGMENT_NAME + 
TarCompressionUtils.TAR_COMPRESSED_FILE_EXTENSION);
     File tempInputDir = new File(tempRootDir, "input");
     FileUtils.write(new File(tempInputDir, "tmp.txt"), "this is in segment 
dir");
     TarCompressionUtils.createCompressedTarFile(tempInputDir, tempTar);
@@ -687,7 +688,8 @@ public class BaseTableDataManagerTest {
   private static SegmentZKMetadata createRawSegment(SegmentVersion 
segmentVersion, int numRows)
       throws Exception {
     File indexDir = createSegment(segmentVersion, numRows);
-    return makeRawSegment(indexDir, new File(TEMP_DIR, SEGMENT_NAME + 
TarCompressionUtils.TAR_GZ_FILE_EXTENSION), true);
+    return makeRawSegment(indexDir,
+        new File(TEMP_DIR, SEGMENT_NAME + 
TarCompressionUtils.TAR_COMPRESSED_FILE_EXTENSION), true);
   }
 
   private static SegmentZKMetadata makeRawSegment(File indexDir, File 
rawSegmentFile, boolean deleteIndexDir)
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
index dc4100eebc..98f700c277 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
@@ -59,6 +59,7 @@ import org.apache.pinot.common.utils.PinotAppConfigs;
 import org.apache.pinot.common.utils.ServiceStartableUtils;
 import org.apache.pinot.common.utils.ServiceStatus;
 import org.apache.pinot.common.utils.ServiceStatus.Status;
+import org.apache.pinot.common.utils.TarCompressionUtils;
 import org.apache.pinot.common.utils.config.TagNameUtils;
 import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory;
 import org.apache.pinot.common.utils.helix.HelixHelper;
@@ -161,6 +162,12 @@ public abstract class BaseServerStarter implements 
ServiceStartable {
         _serverConf.getProperty(CommonConstants.CONFIG_OF_PINOT_INSECURE_MODE,
             CommonConstants.DEFAULT_PINOT_INSECURE_MODE)));
 
+    String tarCompressionCodecName =
+        
_serverConf.getProperty(CommonConstants.CONFIG_OF_PINOT_TAR_COMPRESSION_CODEC_NAME);
+    if (null != tarCompressionCodecName) {
+      TarCompressionUtils.setDefaultCompressor(tarCompressionCodecName);
+    }
+
     setupHelixSystemProperties();
     _listenerConfigs = ListenerConfigUtil.buildServerAdminConfigs(_serverConf);
     _hostname = _serverConf.getProperty(Helix.KEY_OF_SERVER_NETTY_HOST,
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index ff81f6bc4e..f62efb2062 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -65,6 +65,8 @@ public class CommonConstants {
   public static final String CONFIG_OF_EXECUTORS_FIXED_NUM_THREADS = 
"pinot.executors.fixed.default.numThreads";
   public static final String DEFAULT_EXECUTORS_FIXED_NUM_THREADS = "-1";
 
+  public static final String CONFIG_OF_PINOT_TAR_COMPRESSION_CODEC_NAME = 
"pinot.tar.compression.codec.name";
+
   /**
    * The state of the consumer for a given segment
    */


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to