This is an automated email from the ASF dual-hosted git repository.

manishswaminathan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 82bdda5a38 Parallelize segment metadata file generation. (#15030)
82bdda5a38 is described below

commit 82bdda5a3836dd7b0bca3c6497772b6b7837e05d
Author: Abhishek Bafna <aba...@startree.ai>
AuthorDate: Fri Feb 14 11:39:37 2025 +0530

    Parallelize segment metadata file generation. (#15030)
    
    * Parallelize segment metadata file generation.
    
    * Improved error handling and unit tests.
    
    * Introduce segmentMetadataGenerationParallelism config.
    
    * Move executor shutdown past thread completion.
    
    * Code refactor - pass executor as parameter.
    
    ---------
    
    Co-authored-by: abhishekbafna <abhishek.ba...@startree.ai>
---
 .../segment/local/utils/SegmentPushUtils.java      | 127 ++++++++++++++-------
 .../segment/local/utils/SegmentPushUtilsTest.java  |  81 ++++++++++++-
 .../spi/ingestion/batch/spec/PushJobSpec.java      |  14 +++
 3 files changed, 177 insertions(+), 45 deletions(-)

diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPushUtils.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPushUtils.java
index 5403575684..d4e39a11a9 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPushUtils.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPushUtils.java
@@ -31,10 +31,16 @@ import java.nio.file.PathMatcher;
 import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hc.core5.http.Header;
@@ -373,47 +379,24 @@ public class SegmentPushUtils implements Serializable {
       Map<String, String> segmentUriToTarPathMap, List<Header> headers, 
List<NameValuePair> parameters)
       throws Exception {
     String tableName = spec.getTableSpec().getTableName();
-    Map<String, File> segmentMetadataFileMap = new HashMap<>();
-    List<String> segmentURIs = new ArrayList<>();
-    LOGGER.info("Start pushing segment metadata: {} to locations: {} for 
table: {}", segmentUriToTarPathMap,
-        Arrays.toString(spec.getPinotClusterSpecs()), tableName);
-    for (String segmentUriPath : segmentUriToTarPathMap.keySet()) {
-      String tarFilePath = segmentUriToTarPathMap.get(segmentUriPath);
-      String fileName = new File(tarFilePath).getName();
-      // segments stored in Pinot deep store do not have .tar.gz extension
-      String segmentName = fileName.endsWith(Constants.TAR_GZ_FILE_EXT)
-          ? fileName.substring(0, fileName.length() - 
Constants.TAR_GZ_FILE_EXT.length()) : fileName;
-      SegmentNameUtils.validatePartialOrFullSegmentName(segmentName);
-      File segmentMetadataFile;
-      // Check if there is a segment metadata tar gz file named 
`segmentName.metadata.tar.gz`, already in the remote
-      // directory. This is to avoid generating a new segment metadata tar gz 
file every time we push a segment,
-      // which requires downloading the entire segment tar gz file.
-
-      URI metadataTarGzFilePath = generateSegmentMetadataURI(tarFilePath, 
segmentName);
-      LOGGER.info("Checking if metadata tar gz file {} exists", 
metadataTarGzFilePath);
-      if (spec.getPushJobSpec().isPreferMetadataTarGz() && 
fileSystem.exists(metadataTarGzFilePath)) {
-        segmentMetadataFile = new File(FileUtils.getTempDirectory(),
-            SegmentUploadConstants.SEGMENT_METADATA_DIR_PREFIX + 
UUID.randomUUID()
-                + TarCompressionUtils.TAR_GZ_FILE_EXTENSION);
-        if (segmentMetadataFile.exists()) {
-          FileUtils.forceDelete(segmentMetadataFile);
-        }
-        fileSystem.copyToLocalFile(metadataTarGzFilePath, segmentMetadataFile);
-      } else {
-        segmentMetadataFile = generateSegmentMetadataFile(fileSystem, 
URI.create(tarFilePath));
-      }
-      segmentMetadataFileMap.put(segmentName, segmentMetadataFile);
-      segmentURIs.add(segmentName);
-      segmentURIs.add(segmentUriPath);
-    }
-
-    File allSegmentsMetadataTarFile = 
createSegmentsMetadataTarFile(segmentURIs, segmentMetadataFileMap);
+    ConcurrentHashMap<String, File> segmentMetadataFileMap = new 
ConcurrentHashMap<>();
+    ConcurrentLinkedQueue<String> segmentURIs = new ConcurrentLinkedQueue<>();
     Map<String, File> allSegmentsMetadataMap = new HashMap<>();
-    // the key is unused in batch upload mode and hence 'noopKey'
-    allSegmentsMetadataMap.put("noopKey", allSegmentsMetadataTarFile);
+    File allSegmentsMetadataTarFile = null;
+    int nThreads = 
spec.getPushJobSpec().getSegmentMetadataGenerationParallelism();
+    ExecutorService executor = Executors.newFixedThreadPool(nThreads);
+    LOGGER.info("Start pushing segment metadata: {} to locations: {} for 
table: {} with parallelism: {}",
+        segmentUriToTarPathMap, Arrays.toString(spec.getPinotClusterSpecs()), 
tableName,
+        spec.getPushJobSpec().getPushParallelism());
 
-    // perform metadata push in batch mode for every cluster
     try {
+      generateSegmentMetadataFiles(spec, fileSystem, segmentUriToTarPathMap, 
segmentMetadataFileMap, segmentURIs,
+          executor);
+      allSegmentsMetadataTarFile = createSegmentsMetadataTarFile(segmentURIs, 
segmentMetadataFileMap);
+      // the key is unused in batch upload mode and hence 'noopKey'
+      allSegmentsMetadataMap.put("noopKey", allSegmentsMetadataTarFile);
+
+      // perform metadata push in batch mode for every cluster
       for (PinotClusterSpec pinotClusterSpec : spec.getPinotClusterSpecs()) {
         URI controllerURI;
         try {
@@ -458,10 +441,70 @@ public class SegmentPushUtils implements Serializable {
         });
       }
     } finally {
-      for (Map.Entry<String, File> metadataFileEntry: 
segmentMetadataFileMap.entrySet()) {
+      for (Map.Entry<String, File> metadataFileEntry : 
segmentMetadataFileMap.entrySet()) {
         FileUtils.deleteQuietly(metadataFileEntry.getValue());
       }
-      FileUtils.forceDelete(allSegmentsMetadataTarFile);
+      if (allSegmentsMetadataTarFile != null) {
+        FileUtils.deleteQuietly(allSegmentsMetadataTarFile);
+      }
+      executor.shutdown();
+    }
+  }
+
+  @VisibleForTesting
+  static void generateSegmentMetadataFiles(SegmentGenerationJobSpec spec, 
PinotFS fileSystem,
+      Map<String, String> segmentUriToTarPathMap, ConcurrentHashMap<String, 
File> segmentMetadataFileMap,
+      ConcurrentLinkedQueue<String> segmentURIs, ExecutorService executor) {
+
+    List<Future<Void>> futures = new ArrayList<>();
+    // Generate segment metadata files in parallel
+    for (String segmentUriPath : segmentUriToTarPathMap.keySet()) {
+      futures.add(
+          executor.submit(() -> {
+            String tarFilePath = segmentUriToTarPathMap.get(segmentUriPath);
+            String fileName = new File(tarFilePath).getName();
+            // segments stored in Pinot deep store do not have .tar.gz 
extension
+            String segmentName = fileName.endsWith(Constants.TAR_GZ_FILE_EXT)
+                ? fileName.substring(0, fileName.length() - 
Constants.TAR_GZ_FILE_EXT.length()) : fileName;
+            SegmentNameUtils.validatePartialOrFullSegmentName(segmentName);
+            File segmentMetadataFile;
+            // Check if there is a segment metadata tar gz file named 
`segmentName.metadata.tar.gz`, already in the
+            // remote directory. This is to avoid generating a new segment 
metadata tar gz file every time we push a
+            // segment, which requires downloading the entire segment tar gz 
file.
+
+            URI metadataTarGzFilePath = 
generateSegmentMetadataURI(tarFilePath, segmentName);
+            LOGGER.info("Checking if metadata tar gz file {} exists", 
metadataTarGzFilePath);
+            if (spec.getPushJobSpec().isPreferMetadataTarGz() && 
fileSystem.exists(metadataTarGzFilePath)) {
+              segmentMetadataFile = new File(FileUtils.getTempDirectory(),
+                  SegmentUploadConstants.SEGMENT_METADATA_DIR_PREFIX + 
UUID.randomUUID()
+                      + TarCompressionUtils.TAR_GZ_FILE_EXTENSION);
+              if (segmentMetadataFile.exists()) {
+                FileUtils.forceDelete(segmentMetadataFile);
+              }
+              fileSystem.copyToLocalFile(metadataTarGzFilePath, 
segmentMetadataFile);
+            } else {
+              segmentMetadataFile = generateSegmentMetadataFile(fileSystem, 
URI.create(tarFilePath));
+            }
+            segmentMetadataFileMap.put(segmentName, segmentMetadataFile);
+            segmentURIs.add(segmentName);
+            segmentURIs.add(segmentUriPath);
+            return null;
+          }));
+    }
+    int errorCount = 0;
+    Exception exception = null;
+    for (Future<Void> future : futures) {
+      try {
+        future.get();
+      } catch (Exception e) {
+        errorCount++;
+        exception = e;
+      }
+    }
+    if (errorCount > 0) {
+      throw new RuntimeException(
+          String.format("%d out of %d segment metadata generation failed", 
errorCount, segmentUriToTarPathMap.size()),
+          exception);
     }
   }
 
@@ -483,13 +526,13 @@ public class SegmentPushUtils implements Serializable {
   // Additionally, it contains a segmentName to segmentDownloadURI mapping 
file which allows us to avoid sending the
   // segmentDownloadURI as a header field as there are limitations on the 
number of headers allowed in the http request.
   @VisibleForTesting
-  static File createSegmentsMetadataTarFile(List<String> segmentURIs, 
Map<String, File> segmentMetadataFileMap)
+  static File createSegmentsMetadataTarFile(Collection<String> segmentURIs, 
Map<String, File> segmentMetadataFileMap)
       throws IOException {
     String uuid = UUID.randomUUID().toString();
     File allSegmentsMetadataDir =
         new File(FileUtils.getTempDirectory(), 
SegmentUploadConstants.ALL_SEGMENTS_METADATA_DIR_PREFIX + uuid);
     FileUtils.forceMkdir(allSegmentsMetadataDir);
-    for (Map.Entry<String, File> segmentMetadataTarFileEntry: 
segmentMetadataFileMap.entrySet()) {
+    for (Map.Entry<String, File> segmentMetadataTarFileEntry : 
segmentMetadataFileMap.entrySet()) {
       String segmentName = segmentMetadataTarFileEntry.getKey();
       File tarFile = segmentMetadataTarFileEntry.getValue();
       TarCompressionUtils.untarOneFile(tarFile, 
V1Constants.MetadataKeys.METADATA_FILE_NAME,
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/SegmentPushUtilsTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/SegmentPushUtilsTest.java
index 1fb5cfd43a..8a86283e15 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/SegmentPushUtilsTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/SegmentPushUtilsTest.java
@@ -28,17 +28,26 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import org.apache.commons.io.FileUtils;
 import org.apache.hc.core5.http.NameValuePair;
 import org.apache.pinot.common.utils.FileUploadDownloadClient;
 import org.apache.pinot.common.utils.TarCompressionUtils;
+import org.apache.pinot.spi.filesystem.PinotFS;
 import org.apache.pinot.spi.ingestion.batch.spec.PushJobSpec;
 import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
+import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
 
@@ -47,13 +56,15 @@ public class SegmentPushUtilsTest {
   private File _tempDir;
 
   @BeforeMethod
-  public void setUp() throws IOException {
+  public void setUp()
+      throws IOException {
     _tempDir = new File(FileUtils.getTempDirectory(), "test-" + 
UUID.randomUUID());
     FileUtils.forceMkdir(_tempDir);
   }
 
   @AfterMethod
-  public void tearDown() throws IOException {
+  public void tearDown()
+      throws IOException {
     FileUtils.deleteDirectory(_tempDir);
   }
 
@@ -122,7 +133,8 @@ public class SegmentPushUtilsTest {
   }
 
   @Test
-  public void testCreateSegmentsMetadataTarFile() throws IOException {
+  public void testCreateSegmentsMetadataTarFile()
+      throws IOException {
     // setup
     List<String> segmentURIs = Arrays.asList("http://example.com/segment1";, 
"http://example.com/segment2";);
 
@@ -156,4 +168,67 @@ public class SegmentPushUtilsTest {
     assertTrue(result.exists(), "The resulting tar.gz file should exist");
     assertTrue(result.getName().endsWith(".tar.gz"), "The resulting file 
should have a .tar.gz extension");
   }
+
+  @Test
+  public void testGenerateSegmentMetadataFiles()
+      throws Exception {
+    SegmentGenerationJobSpec jobSpec = new SegmentGenerationJobSpec();
+    PushJobSpec pushJobSpec = new PushJobSpec();
+    pushJobSpec.setSegmentMetadataGenerationParallelism(2);
+    jobSpec.setPushJobSpec(pushJobSpec);
+    PinotFS mockFs = Mockito.mock(PinotFS.class);
+    Map<String, String> segmentUriToTarPathMap = Map.of(
+        "segment1", "segment1.tar.gz",
+        "segment2", "segment2.tar.gz"
+    );
+    ConcurrentHashMap<String, File> segmentMetadataFileMap = new 
ConcurrentHashMap<>();
+    ConcurrentLinkedQueue<String> segmentURIs = new ConcurrentLinkedQueue<>();
+
+    when(mockFs.exists(any(URI.class))).thenReturn(true);
+    mockFs.copyToLocalFile(any(URI.class), any(File.class));
+
+    ExecutorService executor = Executors.newFixedThreadPool(2);
+    try {
+      SegmentPushUtils.generateSegmentMetadataFiles(jobSpec, mockFs, 
segmentUriToTarPathMap, segmentMetadataFileMap,
+          segmentURIs, executor);
+    } finally {
+      executor.shutdown();
+    }
+
+    assertEquals(segmentMetadataFileMap.size(), 2);
+    assertEquals(segmentURIs.size(), 4);
+  }
+
+  @Test
+  public void testGenerateSegmentMetadataFilesFailure()
+      throws Exception {
+    SegmentGenerationJobSpec jobSpec = new SegmentGenerationJobSpec();
+    PushJobSpec pushJobSpec = new PushJobSpec();
+    jobSpec.setPushJobSpec(pushJobSpec);
+    PinotFS mockFs = Mockito.mock(PinotFS.class);
+    Map<String, String> segmentUriToTarPathMap = Map.of(
+        "segment1", "segment1.tar.gz",
+        "segment2", "segment2.tar.gz"
+    );
+    ConcurrentHashMap<String, File> segmentMetadataFileMap = new 
ConcurrentHashMap<>();
+    ConcurrentLinkedQueue<String> segmentURIs = new ConcurrentLinkedQueue<>();
+
+    when(mockFs.exists(any(URI.class))).thenReturn(true);
+    doNothing().doThrow(new RuntimeException("test exception"))
+        .when(mockFs)
+        .copyToLocalFile(any(URI.class), any(File.class));
+
+    ExecutorService executor = Executors.newSingleThreadExecutor();
+    try {
+      SegmentPushUtils.generateSegmentMetadataFiles(jobSpec, mockFs, 
segmentUriToTarPathMap, segmentMetadataFileMap,
+          segmentURIs, executor);
+    } catch (Exception e) {
+      assertEquals(e.getMessage(), "1 out of 2 segment metadata generation 
failed");
+    } finally {
+      executor.shutdown();
+    }
+
+    assertEquals(segmentMetadataFileMap.size(), 1);
+    assertEquals(segmentURIs.size(), 2);
+  }
 }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/PushJobSpec.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/PushJobSpec.java
index c087e364cd..d67f8d1785 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/PushJobSpec.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/PushJobSpec.java
@@ -53,6 +53,12 @@ public class PushJobSpec implements Serializable {
    */
   private boolean _batchSegmentUpload;
 
+  /**
+   * Applicable for METADATA push type.
+   * Number of threads to use for segment metadata generation.
+   */
+  private int _segmentMetadataGenerationParallelism = 1;
+
   /**
    * Used in SegmentUriPushJobRunner, which is used to composite the segment 
uri to send to pinot controller.
    * The URI sends to controller is in the format 
${segmentUriPrefix}${segmentPath}${segmentUriSuffix}
@@ -163,4 +169,12 @@ public class PushJobSpec implements Serializable {
   public void setBatchSegmentUpload(boolean batchSegmentUpload) {
     _batchSegmentUpload = batchSegmentUpload;
   }
+
+  public int getSegmentMetadataGenerationParallelism() {
+    return _segmentMetadataGenerationParallelism;
+  }
+
+  public void setSegmentMetadataGenerationParallelism(int 
segmentMetadataGenerationParallelism) {
+    _segmentMetadataGenerationParallelism = 
segmentMetadataGenerationParallelism;
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to