This is an automated email from the ASF dual-hosted git repository. manishswaminathan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 82bdda5a38 Parallelize segment metadata file generation. (#15030) 82bdda5a38 is described below commit 82bdda5a3836dd7b0bca3c6497772b6b7837e05d Author: Abhishek Bafna <aba...@startree.ai> AuthorDate: Fri Feb 14 11:39:37 2025 +0530 Parallelize segment metadata file generation. (#15030) * Parallelize segment metadata file generation. * Improved error handling and unit tests. * Introduce segmentMetadataGenerationParallelism config. * Move executor shutdown past thread completion. * Code refactor - pass executor as parameter. --------- Co-authored-by: abhishekbafna <abhishek.ba...@startree.ai> --- .../segment/local/utils/SegmentPushUtils.java | 127 ++++++++++++++------- .../segment/local/utils/SegmentPushUtilsTest.java | 81 ++++++++++++- .../spi/ingestion/batch/spec/PushJobSpec.java | 14 +++ 3 files changed, 177 insertions(+), 45 deletions(-) diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPushUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPushUtils.java index 5403575684..d4e39a11a9 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPushUtils.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPushUtils.java @@ -31,10 +31,16 @@ import java.nio.file.PathMatcher; import java.nio.file.Paths; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.StringUtils; import org.apache.hc.core5.http.Header; @@ -373,47 +379,24 @@ public class SegmentPushUtils implements Serializable { Map<String, String> segmentUriToTarPathMap, List<Header> headers, List<NameValuePair> parameters) throws Exception { String tableName = spec.getTableSpec().getTableName(); - Map<String, File> segmentMetadataFileMap = new HashMap<>(); - List<String> segmentURIs = new ArrayList<>(); - LOGGER.info("Start pushing segment metadata: {} to locations: {} for table: {}", segmentUriToTarPathMap, - Arrays.toString(spec.getPinotClusterSpecs()), tableName); - for (String segmentUriPath : segmentUriToTarPathMap.keySet()) { - String tarFilePath = segmentUriToTarPathMap.get(segmentUriPath); - String fileName = new File(tarFilePath).getName(); - // segments stored in Pinot deep store do not have .tar.gz extension - String segmentName = fileName.endsWith(Constants.TAR_GZ_FILE_EXT) - ? fileName.substring(0, fileName.length() - Constants.TAR_GZ_FILE_EXT.length()) : fileName; - SegmentNameUtils.validatePartialOrFullSegmentName(segmentName); - File segmentMetadataFile; - // Check if there is a segment metadata tar gz file named `segmentName.metadata.tar.gz`, already in the remote - // directory. This is to avoid generating a new segment metadata tar gz file every time we push a segment, - // which requires downloading the entire segment tar gz file. - - URI metadataTarGzFilePath = generateSegmentMetadataURI(tarFilePath, segmentName); - LOGGER.info("Checking if metadata tar gz file {} exists", metadataTarGzFilePath); - if (spec.getPushJobSpec().isPreferMetadataTarGz() && fileSystem.exists(metadataTarGzFilePath)) { - segmentMetadataFile = new File(FileUtils.getTempDirectory(), - SegmentUploadConstants.SEGMENT_METADATA_DIR_PREFIX + UUID.randomUUID() - + TarCompressionUtils.TAR_GZ_FILE_EXTENSION); - if (segmentMetadataFile.exists()) { - FileUtils.forceDelete(segmentMetadataFile); - } - fileSystem.copyToLocalFile(metadataTarGzFilePath, segmentMetadataFile); - } else { - segmentMetadataFile = generateSegmentMetadataFile(fileSystem, URI.create(tarFilePath)); - } - segmentMetadataFileMap.put(segmentName, segmentMetadataFile); - segmentURIs.add(segmentName); - segmentURIs.add(segmentUriPath); - } - - File allSegmentsMetadataTarFile = createSegmentsMetadataTarFile(segmentURIs, segmentMetadataFileMap); + ConcurrentHashMap<String, File> segmentMetadataFileMap = new ConcurrentHashMap<>(); + ConcurrentLinkedQueue<String> segmentURIs = new ConcurrentLinkedQueue<>(); Map<String, File> allSegmentsMetadataMap = new HashMap<>(); - // the key is unused in batch upload mode and hence 'noopKey' - allSegmentsMetadataMap.put("noopKey", allSegmentsMetadataTarFile); + File allSegmentsMetadataTarFile = null; + int nThreads = spec.getPushJobSpec().getSegmentMetadataGenerationParallelism(); + ExecutorService executor = Executors.newFixedThreadPool(nThreads); + LOGGER.info("Start pushing segment metadata: {} to locations: {} for table: {} with parallelism: {}", + segmentUriToTarPathMap, Arrays.toString(spec.getPinotClusterSpecs()), tableName, + spec.getPushJobSpec().getPushParallelism()); - // perform metadata push in batch mode for every cluster try { + generateSegmentMetadataFiles(spec, fileSystem, segmentUriToTarPathMap, segmentMetadataFileMap, segmentURIs, + executor); + allSegmentsMetadataTarFile = createSegmentsMetadataTarFile(segmentURIs, segmentMetadataFileMap); + // the key is unused in batch upload mode and hence 'noopKey' + allSegmentsMetadataMap.put("noopKey", allSegmentsMetadataTarFile); + + // perform metadata push in batch mode for every cluster for (PinotClusterSpec pinotClusterSpec : spec.getPinotClusterSpecs()) { URI controllerURI; try { @@ -458,10 +441,70 @@ public class SegmentPushUtils implements Serializable { }); } } finally { - for (Map.Entry<String, File> metadataFileEntry: segmentMetadataFileMap.entrySet()) { + for (Map.Entry<String, File> metadataFileEntry : segmentMetadataFileMap.entrySet()) { FileUtils.deleteQuietly(metadataFileEntry.getValue()); } - FileUtils.forceDelete(allSegmentsMetadataTarFile); + if (allSegmentsMetadataTarFile != null) { + FileUtils.deleteQuietly(allSegmentsMetadataTarFile); + } + executor.shutdown(); + } + } + + @VisibleForTesting + static void generateSegmentMetadataFiles(SegmentGenerationJobSpec spec, PinotFS fileSystem, + Map<String, String> segmentUriToTarPathMap, ConcurrentHashMap<String, File> segmentMetadataFileMap, + ConcurrentLinkedQueue<String> segmentURIs, ExecutorService executor) { + + List<Future<Void>> futures = new ArrayList<>(); + // Generate segment metadata files in parallel + for (String segmentUriPath : segmentUriToTarPathMap.keySet()) { + futures.add( + executor.submit(() -> { + String tarFilePath = segmentUriToTarPathMap.get(segmentUriPath); + String fileName = new File(tarFilePath).getName(); + // segments stored in Pinot deep store do not have .tar.gz extension + String segmentName = fileName.endsWith(Constants.TAR_GZ_FILE_EXT) + ? fileName.substring(0, fileName.length() - Constants.TAR_GZ_FILE_EXT.length()) : fileName; + SegmentNameUtils.validatePartialOrFullSegmentName(segmentName); + File segmentMetadataFile; + // Check if there is a segment metadata tar gz file named `segmentName.metadata.tar.gz`, already in the + // remote directory. This is to avoid generating a new segment metadata tar gz file every time we push a + // segment, which requires downloading the entire segment tar gz file. + + URI metadataTarGzFilePath = generateSegmentMetadataURI(tarFilePath, segmentName); + LOGGER.info("Checking if metadata tar gz file {} exists", metadataTarGzFilePath); + if (spec.getPushJobSpec().isPreferMetadataTarGz() && fileSystem.exists(metadataTarGzFilePath)) { + segmentMetadataFile = new File(FileUtils.getTempDirectory(), + SegmentUploadConstants.SEGMENT_METADATA_DIR_PREFIX + UUID.randomUUID() + + TarCompressionUtils.TAR_GZ_FILE_EXTENSION); + if (segmentMetadataFile.exists()) { + FileUtils.forceDelete(segmentMetadataFile); + } + fileSystem.copyToLocalFile(metadataTarGzFilePath, segmentMetadataFile); + } else { + segmentMetadataFile = generateSegmentMetadataFile(fileSystem, URI.create(tarFilePath)); + } + segmentMetadataFileMap.put(segmentName, segmentMetadataFile); + segmentURIs.add(segmentName); + segmentURIs.add(segmentUriPath); + return null; + })); + } + int errorCount = 0; + Exception exception = null; + for (Future<Void> future : futures) { + try { + future.get(); + } catch (Exception e) { + errorCount++; + exception = e; + } + } + if (errorCount > 0) { + throw new RuntimeException( + String.format("%d out of %d segment metadata generation failed", errorCount, segmentUriToTarPathMap.size()), + exception); } } @@ -483,13 +526,13 @@ public class SegmentPushUtils implements Serializable { // Additionally, it contains a segmentName to segmentDownloadURI mapping file which allows us to avoid sending the // segmentDownloadURI as a header field as there are limitations on the number of headers allowed in the http request. @VisibleForTesting - static File createSegmentsMetadataTarFile(List<String> segmentURIs, Map<String, File> segmentMetadataFileMap) + static File createSegmentsMetadataTarFile(Collection<String> segmentURIs, Map<String, File> segmentMetadataFileMap) throws IOException { String uuid = UUID.randomUUID().toString(); File allSegmentsMetadataDir = new File(FileUtils.getTempDirectory(), SegmentUploadConstants.ALL_SEGMENTS_METADATA_DIR_PREFIX + uuid); FileUtils.forceMkdir(allSegmentsMetadataDir); - for (Map.Entry<String, File> segmentMetadataTarFileEntry: segmentMetadataFileMap.entrySet()) { + for (Map.Entry<String, File> segmentMetadataTarFileEntry : segmentMetadataFileMap.entrySet()) { String segmentName = segmentMetadataTarFileEntry.getKey(); File tarFile = segmentMetadataTarFileEntry.getValue(); TarCompressionUtils.untarOneFile(tarFile, V1Constants.MetadataKeys.METADATA_FILE_NAME, diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/SegmentPushUtilsTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/SegmentPushUtilsTest.java index 1fb5cfd43a..8a86283e15 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/SegmentPushUtilsTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/SegmentPushUtilsTest.java @@ -28,17 +28,26 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import org.apache.commons.io.FileUtils; import org.apache.hc.core5.http.NameValuePair; import org.apache.pinot.common.utils.FileUploadDownloadClient; import org.apache.pinot.common.utils.TarCompressionUtils; +import org.apache.pinot.spi.filesystem.PinotFS; import org.apache.pinot.spi.ingestion.batch.spec.PushJobSpec; import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec; +import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; @@ -47,13 +56,15 @@ public class SegmentPushUtilsTest { private File _tempDir; @BeforeMethod - public void setUp() throws IOException { + public void setUp() + throws IOException { _tempDir = new File(FileUtils.getTempDirectory(), "test-" + UUID.randomUUID()); FileUtils.forceMkdir(_tempDir); } @AfterMethod - public void tearDown() throws IOException { + public void tearDown() + throws IOException { FileUtils.deleteDirectory(_tempDir); } @@ -122,7 +133,8 @@ public class SegmentPushUtilsTest { } @Test - public void testCreateSegmentsMetadataTarFile() throws IOException { + public void testCreateSegmentsMetadataTarFile() + throws IOException { // setup List<String> segmentURIs = Arrays.asList("http://example.com/segment1", "http://example.com/segment2"); @@ -156,4 +168,67 @@ public class SegmentPushUtilsTest { assertTrue(result.exists(), "The resulting tar.gz file should exist"); assertTrue(result.getName().endsWith(".tar.gz"), "The resulting file should have a .tar.gz extension"); } + + @Test + public void testGenerateSegmentMetadataFiles() + throws Exception { + SegmentGenerationJobSpec jobSpec = new SegmentGenerationJobSpec(); + PushJobSpec pushJobSpec = new PushJobSpec(); + pushJobSpec.setSegmentMetadataGenerationParallelism(2); + jobSpec.setPushJobSpec(pushJobSpec); + PinotFS mockFs = Mockito.mock(PinotFS.class); + Map<String, String> segmentUriToTarPathMap = Map.of( + "segment1", "segment1.tar.gz", + "segment2", "segment2.tar.gz" + ); + ConcurrentHashMap<String, File> segmentMetadataFileMap = new ConcurrentHashMap<>(); + ConcurrentLinkedQueue<String> segmentURIs = new ConcurrentLinkedQueue<>(); + + when(mockFs.exists(any(URI.class))).thenReturn(true); + mockFs.copyToLocalFile(any(URI.class), any(File.class)); + + ExecutorService executor = Executors.newFixedThreadPool(2); + try { + SegmentPushUtils.generateSegmentMetadataFiles(jobSpec, mockFs, segmentUriToTarPathMap, segmentMetadataFileMap, + segmentURIs, executor); + } finally { + executor.shutdown(); + } + + assertEquals(segmentMetadataFileMap.size(), 2); + assertEquals(segmentURIs.size(), 4); + } + + @Test + public void testGenerateSegmentMetadataFilesFailure() + throws Exception { + SegmentGenerationJobSpec jobSpec = new SegmentGenerationJobSpec(); + PushJobSpec pushJobSpec = new PushJobSpec(); + jobSpec.setPushJobSpec(pushJobSpec); + PinotFS mockFs = Mockito.mock(PinotFS.class); + Map<String, String> segmentUriToTarPathMap = Map.of( + "segment1", "segment1.tar.gz", + "segment2", "segment2.tar.gz" + ); + ConcurrentHashMap<String, File> segmentMetadataFileMap = new ConcurrentHashMap<>(); + ConcurrentLinkedQueue<String> segmentURIs = new ConcurrentLinkedQueue<>(); + + when(mockFs.exists(any(URI.class))).thenReturn(true); + doNothing().doThrow(new RuntimeException("test exception")) + .when(mockFs) + .copyToLocalFile(any(URI.class), any(File.class)); + + ExecutorService executor = Executors.newSingleThreadExecutor(); + try { + SegmentPushUtils.generateSegmentMetadataFiles(jobSpec, mockFs, segmentUriToTarPathMap, segmentMetadataFileMap, + segmentURIs, executor); + } catch (Exception e) { + assertEquals(e.getMessage(), "1 out of 2 segment metadata generation failed"); + } finally { + executor.shutdown(); + } + + assertEquals(segmentMetadataFileMap.size(), 1); + assertEquals(segmentURIs.size(), 2); + } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/PushJobSpec.java b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/PushJobSpec.java index c087e364cd..d67f8d1785 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/PushJobSpec.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/PushJobSpec.java @@ -53,6 +53,12 @@ public class PushJobSpec implements Serializable { */ private boolean _batchSegmentUpload; + /** + * Applicable for METADATA push type. + * Number of threads to use for segment metadata generation. + */ + private int _segmentMetadataGenerationParallelism = 1; + /** * Used in SegmentUriPushJobRunner, which is used to composite the segment uri to send to pinot controller. * The URI sends to controller is in the format ${segmentUriPrefix}${segmentPath}${segmentUriSuffix} @@ -163,4 +169,12 @@ public class PushJobSpec implements Serializable { public void setBatchSegmentUpload(boolean batchSegmentUpload) { _batchSegmentUpload = batchSegmentUpload; } + + public int getSegmentMetadataGenerationParallelism() { + return _segmentMetadataGenerationParallelism; + } + + public void setSegmentMetadataGenerationParallelism(int segmentMetadataGenerationParallelism) { + _segmentMetadataGenerationParallelism = segmentMetadataGenerationParallelism; + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org