This is an automated email from the ASF dual-hosted git repository. xiangfu pushed a commit to branch SegmentProcessorFrameworkImprovement in repository https://gitbox.apache.org/repos/asf/pinot.git
commit 002219055986a8737601688ca2f379d889b0ec65 Author: Ragesh Rajagopalan <ragesh.rajagopa...@gmail.com> AuthorDate: Thu Jul 25 11:16:08 2024 -0700 Added support to upload segments in batch mode with METADATA upload type (#13690) --- .../common/utils/FileUploadDownloadClient.java | 7 + .../PinotSegmentUploadDownloadRestletResource.java | 355 ++++++++++++++++++++- .../api/upload/SegmentUploadMetadata.java | 117 +++++++ .../pinot/controller/api/upload/ZKOperator.java | 321 +++++++++++++++++++ .../helix/core/PinotHelixResourceManager.java | 78 +++++ .../SegmentUploadIntegrationTest.java | 104 +++++- .../BaseMultipleSegmentsConversionExecutor.java | 212 +++++++----- .../segment/local/utils/SegmentPushUtils.java | 130 ++++++++ .../spi/ingestion/batch/spec/PushJobSpec.java | 15 + 9 files changed, 1255 insertions(+), 84 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java index 4123e3157e..4a2cb33be2 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java @@ -113,6 +113,7 @@ public class FileUploadDownloadClient implements AutoCloseable { private static final String SCHEMA_PATH = "/schemas"; private static final String OLD_SEGMENT_PATH = "/segments"; private static final String SEGMENT_PATH = "/v2/segments"; + private static final String SEGMENT_UPLOAD_BATCH_PATH = "/v3/segments"; private static final String TABLES_PATH = "/tables"; private static final String TYPE_DELIMITER = "type="; private static final String START_REPLACE_SEGMENTS_PATH = "/startReplaceSegments"; @@ -365,6 +366,12 @@ public class FileUploadDownloadClient implements AutoCloseable { return getURI(controllerURI.getScheme(), controllerURI.getHost(), controllerURI.getPort(), SEGMENT_PATH); } + public static URI getUploadSegmentBatchURI(URI controllerURI) + throws URISyntaxException { + return getURI(controllerURI.getScheme(), controllerURI.getHost(), controllerURI.getPort(), + SEGMENT_UPLOAD_BATCH_PATH); + } + public static URI getStartReplaceSegmentsURI(URI controllerURI, String rawTableName, String tableType, boolean forceCleanup) throws URISyntaxException { diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java index 156a3e9095..2f5082e5b2 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java @@ -29,14 +29,20 @@ import io.swagger.annotations.ApiResponses; import io.swagger.annotations.Authorization; import io.swagger.annotations.SecurityDefinition; import io.swagger.annotations.SwaggerDefinition; +import java.io.BufferedReader; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; +import java.io.InputStreamReader; import java.io.OutputStream; import java.net.InetAddress; import java.net.URI; +import java.net.UnknownHostException; +import java.nio.charset.StandardCharsets; import java.nio.file.Files; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; @@ -59,6 +65,7 @@ import javax.ws.rs.core.HttpHeaders; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import javax.ws.rs.core.StreamingOutput; +import org.apache.commons.io.Charsets; import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; @@ -73,6 +80,7 @@ import org.apache.pinot.common.restlet.resources.StartReplaceSegmentsRequest; import org.apache.pinot.common.utils.DatabaseUtils; import org.apache.pinot.common.utils.FileUploadDownloadClient; import org.apache.pinot.common.utils.FileUploadDownloadClient.FileUploadType; +import org.apache.pinot.common.utils.TarGzCompressionUtils; import org.apache.pinot.common.utils.URIUtils; import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory; import org.apache.pinot.controller.ControllerConf; @@ -81,6 +89,7 @@ import org.apache.pinot.controller.api.access.AccessControlFactory; import org.apache.pinot.controller.api.access.AccessType; import org.apache.pinot.controller.api.access.Authenticate; import org.apache.pinot.controller.api.exception.ControllerApplicationException; +import org.apache.pinot.controller.api.upload.SegmentUploadMetadata; import org.apache.pinot.controller.api.upload.SegmentValidationUtils; import org.apache.pinot.controller.api.upload.ZKOperator; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; @@ -101,6 +110,7 @@ import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.JsonUtils; import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.glassfish.grizzly.http.server.Request; +import org.glassfish.jersey.media.multipart.BodyPart; import org.glassfish.jersey.media.multipart.FormDataBodyPart; import org.glassfish.jersey.media.multipart.FormDataMultiPart; import org.glassfish.jersey.server.ManagedAsync; @@ -295,13 +305,18 @@ public class PinotSegmentUploadDownloadRestletResource { extractHttpHeader(headers, FileUploadDownloadClient.CustomHeaders.COPY_SEGMENT_TO_DEEP_STORE); copySegmentToFinalLocation = Boolean.parseBoolean(copySegmentToDeepStore); createSegmentFileFromMultipart(multiPart, destFile); + PinotFS pinotFS = null; try { URI segmentURI = new URI(sourceDownloadURIStr); - PinotFS pinotFS = PinotFSFactory.create(segmentURI.getScheme()); + pinotFS = PinotFSFactory.create(segmentURI.getScheme()); segmentSizeInBytes = pinotFS.length(segmentURI); } catch (Exception e) { segmentSizeInBytes = -1; LOGGER.warn("Could not fetch segment size for metadata push", e); + } finally { + if (pinotFS != null) { + pinotFS.close(); + } } break; default: @@ -403,6 +418,234 @@ public class PinotSegmentUploadDownloadRestletResource { } } + // Method used to update a list of segments in batch mode with the METADATA upload type. + private SuccessResponse uploadSegments(String tableName, TableType tableType, FormDataMultiPart multiParts, + boolean enableParallelPushProtection, boolean allowRefresh, HttpHeaders headers, Request request) { + long segmentsUploadStartTimeMs = System.currentTimeMillis(); + String rawTableName = TableNameBuilder.extractRawTableName(tableName); + String tableNameWithType = tableType == TableType.OFFLINE ? TableNameBuilder.OFFLINE.tableNameWithType(rawTableName) + : TableNameBuilder.REALTIME.tableNameWithType(rawTableName); + + TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType); + if (tableConfig == null) { + throw new ControllerApplicationException(LOGGER, "Failed to fetch table config for table: " + tableNameWithType, + Response.Status.BAD_REQUEST); + } + + String clientAddress; + try { + clientAddress = InetAddress.getByName(request.getRemoteAddr()).getHostName(); + } catch (UnknownHostException e) { + throw new ControllerApplicationException(LOGGER, "Failed to resolve hostname from input request", + Response.Status.BAD_REQUEST, e); + } + + String uploadTypeStr = extractHttpHeader(headers, FileUploadDownloadClient.CustomHeaders.UPLOAD_TYPE); + FileUploadType uploadType = getUploadType(uploadTypeStr); + if (!FileUploadType.METADATA.equals(uploadType)) { + throw new ControllerApplicationException(LOGGER, "Unsupported upload type: " + uploadTypeStr, + Response.Status.BAD_REQUEST); + } + + String crypterClassNameInHeader = extractHttpHeader(headers, FileUploadDownloadClient.CustomHeaders.CRYPTER); + String ingestionDescriptor = extractHttpHeader(headers, CommonConstants.Controller.INGESTION_DESCRIPTOR); + ControllerFilePathProvider provider = ControllerFilePathProvider.getInstance(); + List<SegmentUploadMetadata> segmentUploadMetadataList = new ArrayList<>(); + List<File> tempFiles = new ArrayList<>(); + List<String> segmentNames = new ArrayList<>(); + List<BodyPart> bodyParts = multiParts.getBodyParts(); + LOGGER.info("Uploading segments in batch mode of size: {}", bodyParts.size()); + + // there would be just one body part + FormDataBodyPart bodyPartFromReq = (FormDataBodyPart) bodyParts.get(0); + String uuid = UUID.randomUUID().toString(); + File allSegmentsMetadataTarFile = new File(FileUtils.getTempDirectory(), "allSegmentsMetadataTar-" + uuid + + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION); + try { + createSegmentFileFromBodyPart(bodyPartFromReq, allSegmentsMetadataTarFile); + } catch (IOException e) { + throw new ControllerApplicationException(LOGGER, "Failed to extract segment metadata files from the input " + + "request. ", Response.Status.INTERNAL_SERVER_ERROR, e); + } + + List<File> segmentMetadataFiles = new ArrayList<>(); + File allSegmentsMetadataDir = new File(FileUtils.getTempDirectory(), "allSegmentsMetadataDir-" + uuid); + try { + FileUtils.forceMkdir(allSegmentsMetadataDir); + List<File> metadataFiles = TarGzCompressionUtils.untar(allSegmentsMetadataTarFile, allSegmentsMetadataDir); + if (!metadataFiles.isEmpty()) { + segmentMetadataFiles.addAll(metadataFiles); + } + } catch (IOException e) { + throw new ControllerApplicationException(LOGGER, "Failed to unzip the segment metadata files. ", + Response.Status.INTERNAL_SERVER_ERROR, e); + } + + // segmentName, (creation.meta, metadata.properties) + Map<String, SegmentMetadataInfo> segmentMetadataFileMap = new HashMap<>(); + // segmentName, segmentDownloadURI + Map<String, String> segmentURIMap = new HashMap<>(); + for (File file: segmentMetadataFiles) { + String fileName = file.getName(); + if (fileName.equalsIgnoreCase("all_segments_metadata")) { + try (InputStream inputStream = FileUtils.openInputStream(file)) { + final InputStreamReader reader = new InputStreamReader(inputStream, Charsets.toCharset( + StandardCharsets.UTF_8)); + try (BufferedReader bufReader = IOUtils.toBufferedReader(reader)) { + String segmentNameLine; + String segmentURILine; + while ((segmentNameLine = bufReader.readLine()) != null) { + segmentURILine = bufReader.readLine(); + segmentURIMap.put(segmentNameLine, segmentURILine); + } + } + } catch (IOException e) { + throw new ControllerApplicationException(LOGGER, "Failed to read the all_segment_metadata file. ", + Response.Status.INTERNAL_SERVER_ERROR, e); + } + } else if (fileName.endsWith(".creation.meta")) { + int suffixLength = ".creation.meta".length(); + String segmentName = fileName.substring(0, fileName.length() - suffixLength); + SegmentMetadataInfo segmentMetadataInfo = segmentMetadataFileMap.getOrDefault(segmentName, + new SegmentMetadataInfo()); + segmentMetadataInfo.setSegmentCreationMetaFile(file); + segmentMetadataFileMap.put(segmentName, segmentMetadataInfo); + } else if (fileName.endsWith(".metadata.properties")) { + int suffixLength = ".metadata.properties".length(); + String segmentName = fileName.substring(0, fileName.length() - suffixLength); + SegmentMetadataInfo segmentMetadataInfo = segmentMetadataFileMap.getOrDefault(segmentName, + new SegmentMetadataInfo()); + segmentMetadataInfo.setSegmentPropertiesFile(file); + segmentMetadataFileMap.put(segmentName, segmentMetadataInfo); + } + } + + try { + int entryCount = 0; + for (Map.Entry<String, SegmentMetadataInfo> entry: segmentMetadataFileMap.entrySet()) { + String segmentName = entry.getKey(); + SegmentMetadataInfo segmentMetadataInfo = entry.getValue(); + segmentNames.add(segmentName); + File tempEncryptedFile; + File tempDecryptedFile; + File tempSegmentDir; + String sourceDownloadURIStr = segmentURIMap.get(segmentName); + if (StringUtils.isEmpty(sourceDownloadURIStr)) { + throw new ControllerApplicationException(LOGGER, + "'DOWNLOAD_URI' is required as a field within the multipart object for METADATA batch upload mode.", + Response.Status.BAD_REQUEST); + } + // The downloadUri for putting into segment zk metadata + String segmentDownloadURIStr = sourceDownloadURIStr; + + String tempFileName = TMP_DIR_PREFIX + UUID.randomUUID(); + tempEncryptedFile = new File(provider.getFileUploadTempDir(), tempFileName + ENCRYPTED_SUFFIX); + tempFiles.add(tempEncryptedFile); + tempDecryptedFile = new File(provider.getFileUploadTempDir(), tempFileName); + tempFiles.add(tempDecryptedFile); + tempSegmentDir = new File(provider.getUntarredFileTempDir(), tempFileName); + tempFiles.add(tempSegmentDir); + boolean encryptSegment = StringUtils.isNotEmpty(crypterClassNameInHeader); + File destFile = encryptSegment ? tempEncryptedFile : tempDecryptedFile; + // override copySegmentToFinalLocation if override provided in headers:COPY_SEGMENT_TO_DEEP_STORE + // else set to false for backward compatibility + String copySegmentToDeepStore = + extractHttpHeader(headers, FileUploadDownloadClient.CustomHeaders.COPY_SEGMENT_TO_DEEP_STORE); + boolean copySegmentToFinalLocation = Boolean.parseBoolean(copySegmentToDeepStore); + createSegmentFileFromSegmentMetadataInfo(segmentMetadataInfo, destFile); + // TODO: Include the untarred segment size when using the METADATA push rest API. Currently we can only use the + // tarred segment size as an approximation. + long segmentSizeInBytes = getSegmentSizeFromFile(sourceDownloadURIStr); + + if (encryptSegment) { + decryptFile(crypterClassNameInHeader, tempEncryptedFile, tempDecryptedFile); + } + + String metadataProviderClass = DefaultMetadataExtractor.class.getName(); + SegmentMetadata segmentMetadata = getSegmentMetadata(tempDecryptedFile, tempSegmentDir, metadataProviderClass); + LOGGER.info("Processing upload request for segment: {} of table: {} with upload type: {} from client: {}, " + + "ingestion descriptor: {}", segmentName, tableNameWithType, uploadType, clientAddress, + ingestionDescriptor); + + // Validate segment + if (tableConfig.getIngestionConfig() == null || tableConfig.getIngestionConfig().isSegmentTimeValueCheck()) { + SegmentValidationUtils.validateTimeInterval(segmentMetadata, tableConfig); + } + + // Encrypt segment + String crypterNameInTableConfig = tableConfig.getValidationConfig().getCrypterClassName(); + Pair<String, File> encryptionInfo = + encryptSegmentIfNeeded(tempDecryptedFile, tempEncryptedFile, encryptSegment, crypterClassNameInHeader, + crypterNameInTableConfig, segmentName, tableNameWithType); + File segmentFile = encryptionInfo.getRight(); + + // Update download URI if controller is responsible for moving the segment to the deep store + URI finalSegmentLocationURI = null; + if (copySegmentToFinalLocation) { + URI dataDirURI = provider.getDataDirURI(); + String dataDirPath = dataDirURI.toString(); + String encodedSegmentName = URIUtils.encode(segmentName); + String finalSegmentLocationPath = URIUtils.getPath(dataDirPath, rawTableName, encodedSegmentName); + if (dataDirURI.getScheme().equalsIgnoreCase(CommonConstants.Segment.LOCAL_SEGMENT_SCHEME)) { + segmentDownloadURIStr = URIUtils.getPath(provider.getVip(), "segments", rawTableName, encodedSegmentName); + } else { + segmentDownloadURIStr = finalSegmentLocationPath; + } + finalSegmentLocationURI = URIUtils.getUri(finalSegmentLocationPath); + } + SegmentUploadMetadata segmentUploadMetadata = + new SegmentUploadMetadata(segmentDownloadURIStr, sourceDownloadURIStr, finalSegmentLocationURI, + segmentSizeInBytes, segmentMetadata, encryptionInfo); + segmentUploadMetadataList.add(segmentUploadMetadata); + LOGGER.info("Using segment download URI: {} for segment: {} of table: {} (move segment: {})", + segmentDownloadURIStr, segmentFile, tableNameWithType, copySegmentToFinalLocation); + // complete segment operations for all the segments + if (++entryCount == segmentMetadataFileMap.size()) { + ZKOperator zkOperator = new ZKOperator(_pinotHelixResourceManager, _controllerConf, _controllerMetrics); + zkOperator.completeSegmentsOperations(tableNameWithType, uploadType, enableParallelPushProtection, + allowRefresh, headers, segmentUploadMetadataList); + } + } + } catch (Exception e) { + _controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_SEGMENT_UPLOAD_ERROR, + segmentUploadMetadataList.size()); + throw new ControllerApplicationException(LOGGER, + "Exception while processing segments to upload: " + e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR, e); + } finally { + cleanupTempFiles(tempFiles); + } + + return new SuccessResponse(String.format("Successfully uploaded segments: %s of table: %s in %s ms", + segmentNames, tableNameWithType, System.currentTimeMillis() - segmentsUploadStartTimeMs)); + } + + private static class SegmentMetadataInfo { + private File _segmentCreationMetaFile; + private File _segmentPropertiesFile; + + public File getSegmentCreationMetaFile() { + return _segmentCreationMetaFile; + } + + public File getSegmentPropertiesFile() { + return _segmentPropertiesFile; + } + + public void setSegmentCreationMetaFile(File file) { + _segmentCreationMetaFile = file; + } + + public void setSegmentPropertiesFile(File file) { + _segmentPropertiesFile = file; + } + } + + private void cleanupTempFiles(List<File> tempFiles) { + for (File tempFile : tempFiles) { + FileUtils.deleteQuietly(tempFile); + } + } + @Nullable private String extractHttpHeader(HttpHeaders headers, String name) { String value = headers.getHeaderString(name); @@ -555,6 +798,65 @@ public class PinotSegmentUploadDownloadRestletResource { } } + @POST + @ManagedAsync + @Produces(MediaType.APPLICATION_JSON) + @Consumes(MediaType.MULTIPART_FORM_DATA) + @Path("/v3/segments") + @Authorize(targetType = TargetType.TABLE, paramName = "tableName", action = Actions.Cluster.UPLOAD_SEGMENT) + @Authenticate(AccessType.CREATE) + @ApiOperation(value = "Upload a batch of segments", notes = "Upload a batch of segments with METADATA upload type") + @ApiResponses(value = { + @ApiResponse(code = 200, message = "Successfully uploaded segment"), + @ApiResponse(code = 400, message = "Bad Request"), + @ApiResponse(code = 403, message = "Segment validation fails"), + @ApiResponse(code = 409, message = "Segment already exists or another parallel push in progress"), + @ApiResponse(code = 410, message = "Segment to refresh does not exist"), + @ApiResponse(code = 412, message = "CRC check fails"), + @ApiResponse(code = 500, message = "Internal error") + }) + @TrackInflightRequestMetrics + @TrackedByGauge(gauge = ControllerGauge.SEGMENT_UPLOADS_IN_PROGRESS) + // This multipart based endpoint is used to upload a list of segments in batch mode. + public void uploadSegmentsAsMultiPart(FormDataMultiPart multiPart, + @ApiParam(value = "Name of the table", required = true) + @QueryParam(FileUploadDownloadClient.QueryParameters.TABLE_NAME) + String tableName, + @ApiParam(value = "Type of the table", required = true) + @QueryParam(FileUploadDownloadClient.QueryParameters.TABLE_TYPE) + String tableType, + @ApiParam(value = "Whether to enable parallel push protection") + @DefaultValue("false") + @QueryParam(FileUploadDownloadClient.QueryParameters.ENABLE_PARALLEL_PUSH_PROTECTION) + boolean enableParallelPushProtection, + @ApiParam(value = "Whether to refresh if the segment already exists") + @DefaultValue("true") + @QueryParam(FileUploadDownloadClient.QueryParameters.ALLOW_REFRESH) + boolean allowRefresh, + @Context HttpHeaders headers, + @Context Request request, + @Suspended final AsyncResponse asyncResponse) { + if (StringUtils.isEmpty(tableName)) { + throw new ControllerApplicationException(LOGGER, + "tableName is a required field while uploading segments in batch mode.", Response.Status.BAD_REQUEST); + } + if (StringUtils.isEmpty(tableType)) { + throw new ControllerApplicationException(LOGGER, + "tableType is a required field while uploading segments in batch mode.", Response.Status.BAD_REQUEST); + } + if (multiPart == null) { + throw new ControllerApplicationException(LOGGER, + "multiPart is a required field while uploading segments in batch mode.", Response.Status.BAD_REQUEST); + } + try { + asyncResponse.resume( + uploadSegments(tableName, TableType.valueOf(tableType.toUpperCase()), multiPart, enableParallelPushProtection, + allowRefresh, headers, request)); + } catch (Throwable t) { + asyncResponse.resume(t); + } + } + @POST @ManagedAsync @Produces(MediaType.APPLICATION_JSON) @@ -752,6 +1054,38 @@ public class PinotSegmentUploadDownloadRestletResource { } } + @VisibleForTesting + static void createSegmentFileFromBodyPart(FormDataBodyPart segmentMetadataBodyPart, File destFile) + throws IOException { + try (InputStream inputStream = segmentMetadataBodyPart.getValueAs(InputStream.class); + OutputStream outputStream = new FileOutputStream(destFile)) { + IOUtils.copyLarge(inputStream, outputStream); + } finally { + segmentMetadataBodyPart.cleanup(); + } + } + + static void createSegmentFileFromSegmentMetadataInfo(SegmentMetadataInfo metadataInfo, File destFile) + throws IOException { + File creationMetaFile = metadataInfo.getSegmentCreationMetaFile(); + File metadataPropertiesFile = metadataInfo.getSegmentPropertiesFile(); + String uuid = UUID.randomUUID().toString(); + File segmentMetadataDir = new File(FileUtils.getTempDirectory(), "segmentMetadataDir-" + uuid); + FileUtils.copyFile(creationMetaFile, new File(segmentMetadataDir, "creation.meta")); + FileUtils.copyFile(metadataPropertiesFile, new File(segmentMetadataDir, "metadata.properties")); + File segmentMetadataTarFile = new File(FileUtils.getTempDirectory(), "segmentMetadataTar-" + uuid + + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION); + if (segmentMetadataTarFile.exists()) { + FileUtils.forceDelete(segmentMetadataTarFile); + } + TarGzCompressionUtils.createTarGzFile(segmentMetadataDir, segmentMetadataTarFile); + try { + FileUtils.copyFile(segmentMetadataTarFile, destFile); + } finally { + FileUtils.forceDelete(segmentMetadataTarFile); + } + } + private FileUploadType getUploadType(String uploadTypeStr) { if (uploadTypeStr != null) { return FileUploadType.valueOf(uploadTypeStr); @@ -760,6 +1094,25 @@ public class PinotSegmentUploadDownloadRestletResource { } } + @VisibleForTesting + long getSegmentSizeFromFile(String sourceDownloadURIStr) + throws IOException { + long segmentSizeInBytes = -1; + PinotFS pinotFS = null; + try { + URI segmentURI = new URI(sourceDownloadURIStr); + pinotFS = PinotFSFactory.create(segmentURI.getScheme()); + segmentSizeInBytes = pinotFS.length(segmentURI); + } catch (Exception e) { + LOGGER.warn(String.format("Exception while segment size for uri: %s", sourceDownloadURIStr), e); + } finally { + if (pinotFS != null) { + pinotFS.close(); + } + } + return segmentSizeInBytes; + } + // Validate that there is one file that is in the input. public static boolean validateMultiPart(Map<String, List<FormDataBodyPart>> map, String segmentName) { boolean isGood = true; diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/SegmentUploadMetadata.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/SegmentUploadMetadata.java new file mode 100644 index 0000000000..d46891572b --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/SegmentUploadMetadata.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.api.upload; + +import java.io.File; +import java.net.URI; +import java.util.Objects; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.helix.zookeeper.datamodel.ZNRecord; +import org.apache.pinot.segment.spi.SegmentMetadata; + + +/** + * Data object used while adding or updating segments. It's comprised of the following fields: + * <ol> + * <li>segmentDownloadURIStr – The segment download URI persisted into the ZK metadata.</li> + * <li>sourceDownloadURIStr – The URI from where the segment could be downloaded.</li> + * <li>finalSegmentLocationURI – The final location of the segment in the deep-store.</li> + * <li>segmentSizeInBytes – The segment size in bytes.</li> + * <li>segmentMetadata – The segment metadata as defined in {@link org.apache.pinot.segment.spi.SegmentMetadata}.</li> + * <li>encryptionInfo – A pair consisting of the crypter class used to encrypt the segment, and the encrypted segment + * file.</li> + * <li>segmentMetadataZNRecord – The segment metadata represented as a helix + * {@link org.apache.helix.zookeeper.datamodel.ZNRecord}.</li> + * </ol> + */ +public class SegmentUploadMetadata { + private final String _segmentDownloadURIStr; + private final String _sourceDownloadURIStr; + private final URI _finalSegmentLocationURI; + private final Long _segmentSizeInBytes; + private final SegmentMetadata _segmentMetadata; + private final Pair<String, File> _encryptionInfo; + private ZNRecord _segmentMetadataZNRecord; + + public SegmentUploadMetadata(String segmentDownloadURIStr, String sourceDownloadURIStr, URI finalSegmentLocationURI, + Long segmentSizeInBytes, SegmentMetadata segmentMetadata, Pair<String, File> encryptionInfo) { + _segmentDownloadURIStr = segmentDownloadURIStr; + _sourceDownloadURIStr = sourceDownloadURIStr; + _segmentSizeInBytes = segmentSizeInBytes; + _segmentMetadata = segmentMetadata; + _encryptionInfo = encryptionInfo; + _finalSegmentLocationURI = finalSegmentLocationURI; + } + + public String getSegmentDownloadURIStr() { + return _segmentDownloadURIStr; + } + + public String getSourceDownloadURIStr() { + return _sourceDownloadURIStr; + } + + public URI getFinalSegmentLocationURI() { + return _finalSegmentLocationURI; + } + + public Long getSegmentSizeInBytes() { + return _segmentSizeInBytes; + } + + public SegmentMetadata getSegmentMetadata() { + return _segmentMetadata; + } + + public Pair<String, File> getEncryptionInfo() { + return _encryptionInfo; + } + + public void setSegmentMetadataZNRecord(ZNRecord segmentMetadataZNRecord) { + _segmentMetadataZNRecord = segmentMetadataZNRecord; + } + + public ZNRecord getSegmentMetadataZNRecord() { + return _segmentMetadataZNRecord; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + SegmentUploadMetadata that = (SegmentUploadMetadata) o; + return Objects.equals(_segmentDownloadURIStr, that._segmentDownloadURIStr) + && Objects.equals(_sourceDownloadURIStr, that._sourceDownloadURIStr) + && Objects.equals(_finalSegmentLocationURI, that._finalSegmentLocationURI) + && Objects.equals(_segmentSizeInBytes, that._segmentSizeInBytes) + && Objects.equals(_segmentMetadata, that._segmentMetadata) + && Objects.equals(_encryptionInfo, that._encryptionInfo) + && Objects.equals(_segmentMetadataZNRecord, that._segmentMetadataZNRecord); + } + + @Override + public int hashCode() { + return Objects.hash(_segmentDownloadURIStr, _sourceDownloadURIStr, _finalSegmentLocationURI, + _segmentSizeInBytes, _segmentMetadata, _encryptionInfo, _segmentMetadataZNRecord); + } +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java index b0aee83d0d..32249320b9 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java @@ -21,9 +21,14 @@ package org.apache.pinot.controller.api.upload; import com.google.common.base.Preconditions; import java.io.File; import java.net.URI; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import javax.annotation.Nullable; import javax.ws.rs.core.HttpHeaders; import javax.ws.rs.core.Response; +import org.apache.commons.lang3.tuple.Pair; import org.apache.helix.model.IdealState; import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; @@ -110,6 +115,61 @@ public class ZKOperator { } } + // Complete segment operations for a list of segments in batch mode + public void completeSegmentsOperations(String tableNameWithType, FileUploadType uploadType, + boolean enableParallelPushProtection, boolean allowRefresh, HttpHeaders headers, + List<SegmentUploadMetadata> segmentUploadMetadataList) + throws Exception { + boolean refreshOnly = + Boolean.parseBoolean(headers.getHeaderString(FileUploadDownloadClient.CustomHeaders.REFRESH_ONLY)); + List<SegmentUploadMetadata> newSegmentsList = new ArrayList<>(); + List<SegmentUploadMetadata> existingSegmentsList = new ArrayList<>(); + for (SegmentUploadMetadata segmentUploadMetadata: segmentUploadMetadataList) { + SegmentMetadata segmentMetadata = segmentUploadMetadata.getSegmentMetadata(); + String segmentName = segmentMetadata.getName(); + + ZNRecord existingSegmentMetadataZNRecord = + _pinotHelixResourceManager.getSegmentMetadataZnRecord(tableNameWithType, segmentName); + if (existingSegmentMetadataZNRecord != null && shouldProcessAsNewSegment(tableNameWithType, segmentName, + existingSegmentMetadataZNRecord, enableParallelPushProtection)) { + LOGGER.warn("Removing segment ZK metadata (recovering from previous upload failure) for table: {}, segment: {}", + tableNameWithType, segmentName); + Preconditions.checkState(_pinotHelixResourceManager.removeSegmentZKMetadata(tableNameWithType, segmentName), + "Failed to remove segment ZK metadata for table: %s, segment: %s", tableNameWithType, segmentName); + existingSegmentMetadataZNRecord = null; + } + + if (existingSegmentMetadataZNRecord == null) { + // Add a new segment + if (refreshOnly) { + throw new ControllerApplicationException(LOGGER, + String.format("Cannot refresh non-existing segment: %s for table: %s", segmentName, tableNameWithType), + Response.Status.GONE); + } + LOGGER.info("Adding new segment: {} to table: {}", segmentName, tableNameWithType); + newSegmentsList.add(segmentUploadMetadata); + } else { + // Refresh an existing segment + if (!allowRefresh) { + // We cannot perform this check up-front in UploadSegment API call. If a segment doesn't exist during the + // check done up-front but ends up getting created before the check here, we could incorrectly refresh an + // existing segment. + throw new ControllerApplicationException(LOGGER, + String.format("Segment: %s already exists in table: %s. Refresh not permitted.", segmentName, + tableNameWithType), Response.Status.CONFLICT); + } + LOGGER.info("Segment: {} already exists in table: {}, refreshing it", segmentName, tableNameWithType); + segmentUploadMetadata.setSegmentMetadataZNRecord(existingSegmentMetadataZNRecord); + existingSegmentsList.add(segmentUploadMetadata); + } + } + // process new segments + processNewSegments(tableNameWithType, uploadType, enableParallelPushProtection, headers, newSegmentsList); + + // process existing segments + processExistingSegments(tableNameWithType, uploadType, enableParallelPushProtection, headers, existingSegmentsList); + } + /** * Returns {@code true} when the segment should be processed as new segment. * <p>When segment ZK metadata exists, check if segment exists in the ideal state. If the previous upload failed after @@ -276,6 +336,144 @@ public class ZKOperator { } } + // process a batch of existing segments + private void processExistingSegments(String tableNameWithType, FileUploadType uploadType, + boolean enableParallelPushProtection, HttpHeaders headers, List<SegmentUploadMetadata> segmentUploadMetadataList) + throws Exception { + for (SegmentUploadMetadata segmentUploadMetadata: segmentUploadMetadataList) { + SegmentMetadata segmentMetadata = segmentUploadMetadata.getSegmentMetadata(); + String segmentDownloadURIStr = segmentUploadMetadata.getSegmentDownloadURIStr(); + String sourceDownloadURIStr = segmentUploadMetadata.getSourceDownloadURIStr(); + URI finalSegmentLocationURI = segmentUploadMetadata.getFinalSegmentLocationURI(); + Pair<String, File> encryptionInfo = segmentUploadMetadata.getEncryptionInfo(); + String crypterName = encryptionInfo.getLeft(); + File segmentFile = encryptionInfo.getRight(); + String segmentName = segmentMetadata.getName(); + ZNRecord existingSegmentMetadataZNRecord = segmentUploadMetadata.getSegmentMetadataZNRecord(); + long segmentSizeInBytes = segmentUploadMetadata.getSegmentSizeInBytes(); + int expectedVersion = existingSegmentMetadataZNRecord.getVersion(); + + // Check if CRC match when IF-MATCH header is set + SegmentZKMetadata segmentZKMetadata = new SegmentZKMetadata(existingSegmentMetadataZNRecord); + long existingCrc = segmentZKMetadata.getCrc(); + checkCRC(headers, tableNameWithType, segmentName, existingCrc); + + // Check segment upload start time when parallel push protection enabled + if (enableParallelPushProtection) { + // When segment upload start time is larger than 0, that means another upload is in progress + long segmentUploadStartTime = segmentZKMetadata.getSegmentUploadStartTime(); + if (segmentUploadStartTime > 0) { + handleParallelPush(tableNameWithType, segmentName, segmentUploadStartTime); + } + + // Lock the segment by setting the upload start time in ZK + segmentZKMetadata.setSegmentUploadStartTime(System.currentTimeMillis()); + if (!_pinotHelixResourceManager.updateZkMetadata(tableNameWithType, segmentZKMetadata, expectedVersion)) { + throw new ControllerApplicationException(LOGGER, + String.format("Failed to lock the segment: %s of table: %s, retry later", segmentName, tableNameWithType), + Response.Status.CONFLICT); + } else { + // The version will increment if the zk metadata update is successful + expectedVersion++; + } + } + + // Reset segment upload start time to unlock the segment later + // NOTE: reset this value even if parallel push protection is not enabled so that segment can recover in case + // previous segment upload did not finish properly and the parallel push protection is turned off + segmentZKMetadata.setSegmentUploadStartTime(-1); + + try { + // Construct the segment ZK metadata custom map modifier + String customMapModifierStr = + headers.getHeaderString(FileUploadDownloadClient.CustomHeaders.SEGMENT_ZK_METADATA_CUSTOM_MAP_MODIFIER); + SegmentZKMetadataCustomMapModifier customMapModifier = + customMapModifierStr != null ? new SegmentZKMetadataCustomMapModifier(customMapModifierStr) : null; + + // Update ZK metadata and refresh the segment if necessary + long newCrc = Long.parseLong(segmentMetadata.getCrc()); + if (newCrc == existingCrc) { + LOGGER.info( + "New segment crc '{}' is the same as existing segment crc for segment '{}'. Updating ZK metadata without " + + "refreshing the segment.", newCrc, segmentName); + // NOTE: Even though we don't need to refresh the segment, we should still update the following fields: + // - Creation time (not included in the crc) + // - Refresh time + // - Custom map + segmentZKMetadata.setCreationTime(segmentMetadata.getIndexCreationTime()); + segmentZKMetadata.setRefreshTime(System.currentTimeMillis()); + if (customMapModifier != null) { + segmentZKMetadata.setCustomMap(customMapModifier.modifyMap(segmentZKMetadata.getCustomMap())); + } else { + // If no modifier is provided, use the custom map from the segment metadata + segmentZKMetadata.setCustomMap(segmentMetadata.getCustomMap()); + } + if (!segmentZKMetadata.getDownloadUrl().equals(segmentDownloadURIStr)) { + // For offline ingestion, it is quite common that the download.uri would change but the crc would be the + // same. E.g. a user re-runs the job which process the same data and segments are stored/pushed from a + // different path from the Deepstore. Read more: https://github.com/apache/pinot/issues/11535 + LOGGER.info("Updating segment download url from: {} to: {} even though crc is the same", + segmentZKMetadata.getDownloadUrl(), segmentDownloadURIStr); + segmentZKMetadata.setDownloadUrl(segmentDownloadURIStr); + // When download URI changes, we also need to copy the segment to the final location if existed. + // This typically means users changed the push type from METADATA to SEGMENT or SEGMENT to METADATA. + // Note that switching push type from SEGMENT to METADATA may lead orphan segments in the controller + // managed directory. Read more: https://github.com/apache/pinot/pull/11720 + if (finalSegmentLocationURI != null) { + copySegmentToDeepStore(tableNameWithType, segmentName, uploadType, segmentFile, sourceDownloadURIStr, + finalSegmentLocationURI); + } + } + if (!_pinotHelixResourceManager.updateZkMetadata(tableNameWithType, segmentZKMetadata, expectedVersion)) { + throw new RuntimeException( + String.format("Failed to update ZK metadata for segment: %s, table: %s, expected version: %d", + segmentName, tableNameWithType, expectedVersion)); + } + } else { + // New segment is different with the existing one, update ZK metadata and refresh the segment + LOGGER.info( + "New segment crc {} is different than the existing segment crc {}. Updating ZK metadata and refreshing " + + "segment {}", newCrc, existingCrc, segmentName); + if (finalSegmentLocationURI != null) { + copySegmentToDeepStore(tableNameWithType, segmentName, uploadType, segmentFile, sourceDownloadURIStr, + finalSegmentLocationURI); + } + + // NOTE: Must first set the segment ZK metadata before trying to refresh because servers and brokers rely on + // segment ZK metadata to refresh the segment (server will compare the segment ZK metadata with the local + // metadata to decide whether to download the new segment; broker will update the segment partition info & + // time boundary based on the segment ZK metadata) + if (customMapModifier == null) { + // If no modifier is provided, use the custom map from the segment metadata + segmentZKMetadata.setCustomMap(null); + ZKMetadataUtils.refreshSegmentZKMetadata(tableNameWithType, segmentZKMetadata, segmentMetadata, + segmentDownloadURIStr, crypterName, segmentSizeInBytes); + } else { + // If modifier is provided, first set the custom map from the segment metadata, then apply the modifier + ZKMetadataUtils.refreshSegmentZKMetadata(tableNameWithType, segmentZKMetadata, segmentMetadata, + segmentDownloadURIStr, crypterName, segmentSizeInBytes); + segmentZKMetadata.setCustomMap(customMapModifier.modifyMap(segmentZKMetadata.getCustomMap())); + } + if (!_pinotHelixResourceManager.updateZkMetadata(tableNameWithType, segmentZKMetadata, expectedVersion)) { + throw new RuntimeException( + String.format("Failed to update ZK metadata for segment: %s, table: %s, expected version: %d", + segmentName, tableNameWithType, expectedVersion)); + } + LOGGER.info("Updated segment: {} of table: {} to property store", segmentName, tableNameWithType); + + // Send a message to servers and brokers hosting the table to refresh the segment + _pinotHelixResourceManager.sendSegmentRefreshMessage(tableNameWithType, segmentName, true, true); + } + } catch (Exception e) { + if (!_pinotHelixResourceManager.updateZkMetadata(tableNameWithType, segmentZKMetadata, expectedVersion)) { + LOGGER.error("Failed to update ZK metadata for segment: {}, table: {}, expected version: {}", segmentName, + tableNameWithType, expectedVersion); + } + throw e; + } + } + } + private void checkCRC(HttpHeaders headers, String tableNameWithType, String segmentName, long existingCrc) { String expectedCrcStr = headers.getHeaderString(HttpHeaders.IF_MATCH); if (expectedCrcStr != null) { @@ -374,6 +572,102 @@ public class ZKOperator { } } + // process a batch of new segments + private void processNewSegments(String tableNameWithType, FileUploadType uploadType, + boolean enableParallelPushProtection, HttpHeaders headers, List<SegmentUploadMetadata> segmentUploadMetadataList) + throws Exception { + Map<String, SegmentZKMetadata> segmentZKMetadataMap = new HashMap<>(); + List<String> segmentNames = new ArrayList<>(); + long segmentUploadStartTime = System.currentTimeMillis(); + for (SegmentUploadMetadata segmentUploadMetadata: segmentUploadMetadataList) { + SegmentMetadata segmentMetadata = segmentUploadMetadata.getSegmentMetadata(); + String segmentName = segmentMetadata.getName(); + SegmentZKMetadata newSegmentZKMetadata; + URI finalSegmentLocationURI = segmentUploadMetadata.getFinalSegmentLocationURI(); + String segmentDownloadURIStr = segmentUploadMetadata.getSegmentDownloadURIStr(); + String sourceDownloadURIStr = segmentUploadMetadata.getSourceDownloadURIStr(); + String crypterName = segmentUploadMetadata.getEncryptionInfo().getLeft(); + long segmentSizeInBytes = segmentUploadMetadata.getSegmentSizeInBytes(); + File segmentFile = segmentUploadMetadata.getEncryptionInfo().getRight(); + try { + newSegmentZKMetadata = ZKMetadataUtils.createSegmentZKMetadata(tableNameWithType, segmentMetadata, + segmentDownloadURIStr, crypterName, segmentSizeInBytes); + segmentZKMetadataMap.put(segmentName, newSegmentZKMetadata); + segmentNames.add(segmentName); + } catch (IllegalArgumentException e) { + throw new ControllerApplicationException(LOGGER, + String.format("Got invalid segment metadata when adding segment: %s for table: %s, reason: %s", segmentName, + tableNameWithType, e.getMessage()), Response.Status.BAD_REQUEST); + } + + // Lock if enableParallelPushProtection is true. + if (enableParallelPushProtection) { + newSegmentZKMetadata.setSegmentUploadStartTime(segmentUploadStartTime); + } + + // Update zk metadata custom map + String segmentZKMetadataCustomMapModifierStr = headers != null ? headers.getHeaderString( + FileUploadDownloadClient.CustomHeaders.SEGMENT_ZK_METADATA_CUSTOM_MAP_MODIFIER) : null; + if (segmentZKMetadataCustomMapModifierStr != null) { + SegmentZKMetadataCustomMapModifier segmentZKMetadataCustomMapModifier = new SegmentZKMetadataCustomMapModifier( + segmentZKMetadataCustomMapModifierStr); + newSegmentZKMetadata.setCustomMap(segmentZKMetadataCustomMapModifier.modifyMap( + newSegmentZKMetadata.getCustomMap())); + } + if (!_pinotHelixResourceManager.createSegmentZkMetadata(tableNameWithType, newSegmentZKMetadata)) { + throw new RuntimeException(String.format("Failed to create ZK metadata for segment: %s of table: %s", + segmentName, tableNameWithType)); + } + + if (finalSegmentLocationURI != null) { + try { + copySegmentToDeepStore(tableNameWithType, segmentName, uploadType, segmentFile, sourceDownloadURIStr, + finalSegmentLocationURI); + } catch (Exception e) { + // Cleanup the Zk entry and the segment from the permanent directory if it exists. + LOGGER.error("Could not move segment {} from table {} to permanent directory", + segmentName, tableNameWithType, e); + // Delete all segments that are getting processed as we are in batch mode + deleteSegmentsIfNeeded(tableNameWithType, segmentNames, segmentUploadStartTime, enableParallelPushProtection); + throw e; + } + } + } + + try { + _pinotHelixResourceManager.assignTableSegments(tableNameWithType, segmentNames); + } catch (Exception e) { + // assignTableSegment removes the zk entry. + // Call deleteSegment to remove the segment from permanent location if needed. + LOGGER.error("Caught exception while calling assignTableSegments for adding segments: {} to table: {}", + segmentZKMetadataMap.keySet(), tableNameWithType, e); + deleteSegmentsIfNeeded(tableNameWithType, segmentNames, segmentUploadStartTime, enableParallelPushProtection); + throw e; + } + + for (Map.Entry<String, SegmentZKMetadata> segmentZKMetadataEntry: segmentZKMetadataMap.entrySet()) { + SegmentZKMetadata newSegmentZKMetadata = segmentZKMetadataEntry.getValue(); + String segmentName = segmentZKMetadataEntry.getKey(); + if (enableParallelPushProtection) { + // Release lock. Expected version will be 0 as we hold a lock and no updates could take place meanwhile. + newSegmentZKMetadata.setSegmentUploadStartTime(-1); + if (!_pinotHelixResourceManager.updateZkMetadata(tableNameWithType, newSegmentZKMetadata, 0)) { + // There is a race condition when it took too much time for the 1st segment upload to process (due to slow + // PinotFS access), which leads to the 2nd attempt of segment upload, and the 2nd segment upload succeeded. + // In this case, when the 1st upload comes back, it shouldn't blindly delete the segment when it failed to + // update the zk metadata. Instead, the 1st attempt should validate the upload start time one more time. + // If the start time doesn't match with the one persisted in zk metadata, segment deletion should be skipped. + String errorMsg = String.format("Failed to update ZK metadata for segment: %s of table: %s", segmentName, + tableNameWithType); + LOGGER.error(errorMsg); + // Delete all segments that are getting processed as we are in batch mode + deleteSegmentsIfNeeded(tableNameWithType, segmentNames, segmentUploadStartTime, true); + throw new RuntimeException(errorMsg); + } + } + } + } + /** * Deletes the segment to be uploaded if either one of the criteria is qualified: * 1) the uploadStartTime matches with the one persisted in ZK metadata. @@ -397,6 +691,33 @@ public class ZKOperator { } } + /** + * Deletes the segments to be uploaded if either one of the criteria is qualified: + * 1) the uploadStartTime matches with the one persisted in ZK metadata. + * 2) enableParallelPushProtection is not enabled. + */ + private void deleteSegmentsIfNeeded(String tableNameWithType, List<String> segmentNames, + long currentSegmentUploadStartTime, boolean enableParallelPushProtection) { + List<String> segmentsToDelete = new ArrayList<>(); + for (String segmentName: segmentNames) { + ZNRecord existingSegmentMetadataZNRecord = + _pinotHelixResourceManager.getSegmentMetadataZnRecord(tableNameWithType, segmentName); + if (existingSegmentMetadataZNRecord == null) { + continue; + } + // Check if the upload start time is set by this thread itself, if yes delete the segment. + SegmentZKMetadata segmentZKMetadata = new SegmentZKMetadata(existingSegmentMetadataZNRecord); + long existingSegmentUploadStartTime = segmentZKMetadata.getSegmentUploadStartTime(); + LOGGER.info("Parallel push protection is {} for segment: {}.", + (enableParallelPushProtection ? "enabled" : "disabled"), segmentName); + if (!enableParallelPushProtection || currentSegmentUploadStartTime == existingSegmentUploadStartTime) { + segmentsToDelete.add(segmentName); + } + } + _pinotHelixResourceManager.deleteSegments(tableNameWithType, segmentsToDelete); + LOGGER.info("Deleted zk entry and segments {} for table {}.", segmentsToDelete, tableNameWithType); + } + private void copySegmentToDeepStore(String tableNameWithType, String segmentName, FileUploadType uploadType, File segmentFile, String sourceDownloadURIStr, URI finalSegmentLocationURI) throws Exception { diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index 2b835faaae..8748a021b3 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -2327,6 +2327,84 @@ public class PinotHelixResourceManager { } } + // Assign a list of segments in batch mode + public void assignTableSegments(String tableNameWithType, List<String> segmentNames) { + Map<String, String> segmentZKMetadataPathMap = new HashMap<>(); + for (String segmentName: segmentNames) { + String segmentZKMetadataPath = ZKMetadataProvider.constructPropertyStorePathForSegment(tableNameWithType, + segmentName); + segmentZKMetadataPathMap.put(segmentName, segmentZKMetadataPath); + } + // Assign instances for the segment and add it into IdealState + try { + TableConfig tableConfig = getTableConfig(tableNameWithType); + Preconditions.checkState(tableConfig != null, "Failed to find table config for table: " + tableNameWithType); + + Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap = + fetchOrComputeInstancePartitions(tableNameWithType, tableConfig); + + // Initialize tier information only in case direct tier assignment is configured + if (_enableTieredSegmentAssignment && CollectionUtils.isNotEmpty(tableConfig.getTierConfigsList())) { + List<Tier> sortedTiers = TierConfigUtils.getSortedTiersForStorageType(tableConfig.getTierConfigsList(), + TierFactory.PINOT_SERVER_STORAGE_TYPE, _helixZkManager); + for (String segmentName: segmentNames) { + // Update segment tier to support direct assignment for multiple data directories + updateSegmentTargetTier(tableNameWithType, segmentName, sortedTiers); + InstancePartitions tierInstancePartitions = TierConfigUtils.getTieredInstancePartitionsForSegment( + tableNameWithType, segmentName, sortedTiers, _helixZkManager); + if (tierInstancePartitions != null && TableNameBuilder.isOfflineTableResource(tableNameWithType)) { + // Override instance partitions for offline table + LOGGER.info("Overriding with tiered instance partitions: {} for segment: {} of table: {}", + tierInstancePartitions, segmentName, tableNameWithType); + instancePartitionsMap = Collections.singletonMap(InstancePartitionsType.OFFLINE, tierInstancePartitions); + } + } + } + + SegmentAssignment segmentAssignment = + SegmentAssignmentFactory.getSegmentAssignment(_helixZkManager, tableConfig, _controllerMetrics); + synchronized (getTableUpdaterLock(tableNameWithType)) { + long segmentAssignmentStartTs = System.currentTimeMillis(); + Map<InstancePartitionsType, InstancePartitions> finalInstancePartitionsMap = instancePartitionsMap; + HelixHelper.updateIdealState(_helixZkManager, tableNameWithType, idealState -> { + assert idealState != null; + for (String segmentName: segmentNames) { + Map<String, Map<String, String>> currentAssignment = idealState.getRecord().getMapFields(); + if (currentAssignment.containsKey(segmentName)) { + LOGGER.warn("Segment: {} already exists in the IdealState for table: {}, do not update", segmentName, + tableNameWithType); + } else { + List<String> assignedInstances = + segmentAssignment.assignSegment(segmentName, currentAssignment, finalInstancePartitionsMap); + LOGGER.info("Assigning segment: {} to instances: {} for table: {}", segmentName, assignedInstances, + tableNameWithType); + currentAssignment.put(segmentName, SegmentAssignmentUtils.getInstanceStateMap(assignedInstances, + SegmentStateModel.ONLINE)); + } + } + return idealState; + }); + LOGGER.info("Added segments: {} to IdealState for table: {} in {} ms", segmentNames, tableNameWithType, + System.currentTimeMillis() - segmentAssignmentStartTs); + } + } catch (Exception e) { + LOGGER.error( + "Caught exception while adding segments: {} to IdealState for table: {}, deleting segments ZK metadata", + segmentNames, tableNameWithType, e); + for (Map.Entry<String, String> segmentZKMetadataPathEntry: segmentZKMetadataPathMap.entrySet()) { + String segmentName = segmentZKMetadataPathEntry.getKey(); + String segmentZKMetadataPath = segmentZKMetadataPathEntry.getValue(); + if (_propertyStore.remove(segmentZKMetadataPath, AccessOption.PERSISTENT)) { + LOGGER.info("Deleted segment ZK metadata for segment: {} of table: {}", segmentName, tableNameWithType); + } else { + LOGGER.error("Failed to delete segment ZK metadata for segment: {} of table: {}", segmentName, + tableNameWithType); + } + } + throw e; + } + } + private Map<InstancePartitionsType, InstancePartitions> fetchOrComputeInstancePartitions(String tableNameWithType, TableConfig tableConfig) { if (TableNameBuilder.isOfflineTableResource(tableNameWithType)) { diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentUploadIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/segupload/SegmentUploadIntegrationTest.java similarity index 78% rename from pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentUploadIntegrationTest.java rename to pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/segupload/SegmentUploadIntegrationTest.java index 35fdae8448..4729cd282f 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentUploadIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/segupload/SegmentUploadIntegrationTest.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.integration.tests; +package org.apache.pinot.integration.tests.segupload; import com.fasterxml.jackson.databind.JsonNode; import com.google.common.base.Function; @@ -27,7 +27,10 @@ import java.util.List; import java.util.Map; import javax.annotation.Nullable; import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.RandomStringUtils; import org.apache.pinot.controller.helix.ControllerTest; +import org.apache.pinot.integration.tests.BaseClusterIntegrationTest; +import org.apache.pinot.integration.tests.ClusterIntegrationTestUtils; import org.apache.pinot.plugin.ingestion.batch.common.BaseSegmentPushJobRunner; import org.apache.pinot.plugin.ingestion.batch.standalone.SegmentMetadataPushJobRunner; import org.apache.pinot.plugin.ingestion.batch.standalone.SegmentTarPushJobRunner; @@ -59,6 +62,7 @@ import org.testng.annotations.Test; * todo: add test for URI push */ public class SegmentUploadIntegrationTest extends BaseClusterIntegrationTest { + private static String _tableNameSuffix; @Override protected Map<String, String> getStreamConfigs() { @@ -93,6 +97,7 @@ public class SegmentUploadIntegrationTest extends BaseClusterIntegrationTest { @BeforeMethod public void setUpTest() throws IOException { + _tableNameSuffix = RandomStringUtils.randomAlphabetic(12); TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir); } @@ -136,15 +141,15 @@ public class SegmentUploadIntegrationTest extends BaseClusterIntegrationTest { jobSpec.setPinotFSSpecs(Lists.newArrayList(fsSpec)); jobSpec.setOutputDirURI(_tarDir.getAbsolutePath()); TableSpec tableSpec = new TableSpec(); - tableSpec.setTableName(DEFAULT_TABLE_NAME); - tableSpec.setTableConfigURI(_controllerRequestURLBuilder.forUpdateTableConfig(DEFAULT_TABLE_NAME)); + tableSpec.setTableName(getTableName() + "_OFFLINE"); + tableSpec.setTableConfigURI(_controllerRequestURLBuilder.forUpdateTableConfig(getTableName())); jobSpec.setTableSpec(tableSpec); PinotClusterSpec clusterSpec = new PinotClusterSpec(); clusterSpec.setControllerURI(getControllerBaseApiUrl()); jobSpec.setPinotClusterSpecs(new PinotClusterSpec[]{clusterSpec}); File dataDir = new File(_controllerConfig.getDataDir()); - File dataDirSegments = new File(dataDir, DEFAULT_TABLE_NAME); + File dataDirSegments = new File(dataDir, getTableName()); // Not present in dataDir, only present in sourceDir Assert.assertFalse(dataDirSegments.exists()); @@ -204,6 +209,78 @@ public class SegmentUploadIntegrationTest extends BaseClusterIntegrationTest { testCountStar(numDocs); } + @Test + public void testUploadMultipleSegmentsInBatchModeAndQuery() + throws Exception { + // Create and upload the schema and table config + Schema schema = createSchema(); + addSchema(schema); + TableConfig offlineTableConfig = createOfflineTableConfig(); + waitForEVToDisappear(offlineTableConfig.getTableName()); + addTableConfig(offlineTableConfig); + + List<File> avroFiles = getAllAvroFiles(); + int numSegments = 12; + + // Create the list of segments + for (int segNum = 0; segNum < numSegments; segNum++) { + ClusterIntegrationTestUtils.buildSegmentFromAvro(avroFiles.get(segNum % 12), offlineTableConfig, schema, + "_seg" + segNum, _segmentDir, _tarDir); + } + + SegmentMetadataPushJobRunner runner = new SegmentMetadataPushJobRunner(); + SegmentGenerationJobSpec jobSpec = new SegmentGenerationJobSpec(); + PushJobSpec pushJobSpec = new PushJobSpec(); + pushJobSpec.setCopyToDeepStoreForMetadataPush(true); + // enable batch mode + pushJobSpec.setBatchMode(true); + jobSpec.setPushJobSpec(pushJobSpec); + PinotFSSpec fsSpec = new PinotFSSpec(); + fsSpec.setScheme("file"); + fsSpec.setClassName("org.apache.pinot.spi.filesystem.LocalPinotFS"); + jobSpec.setPinotFSSpecs(Lists.newArrayList(fsSpec)); + jobSpec.setOutputDirURI(_tarDir.getAbsolutePath()); + TableSpec tableSpec = new TableSpec(); + tableSpec.setTableName(getTableName() + "_OFFLINE"); + tableSpec.setTableConfigURI(_controllerRequestURLBuilder.forUpdateTableConfig(getTableName())); + jobSpec.setTableSpec(tableSpec); + PinotClusterSpec clusterSpec = new PinotClusterSpec(); + clusterSpec.setControllerURI(getControllerBaseApiUrl()); + jobSpec.setPinotClusterSpecs(new PinotClusterSpec[]{clusterSpec}); + + File dataDir = new File(_controllerConfig.getDataDir()); + File dataDirSegments = new File(dataDir, getTableName()); + + // Not present in dataDir, only present in sourceDir + Assert.assertFalse(dataDirSegments.exists()); + Assert.assertEquals(_tarDir.listFiles().length, numSegments); + + runner.init(jobSpec); + runner.run(); + + // Segment should be seen in dataDir + Assert.assertTrue(dataDirSegments.exists()); + Assert.assertEquals(dataDirSegments.listFiles().length, numSegments); + Assert.assertEquals(_tarDir.listFiles().length, numSegments); + + // test segment loaded + JsonNode segmentsList = getSegmentsList(); + Assert.assertEquals(segmentsList.size(), numSegments); + long numDocs = 0; + for (JsonNode segmentName: segmentsList) { + numDocs += getNumDocs(segmentName.asText()); + } + testCountStar(numDocs); + + // Clear segment and tar dir + for (File segment : _segmentDir.listFiles()) { + FileUtils.deleteQuietly(segment); + } + for (File tar : _tarDir.listFiles()) { + FileUtils.deleteQuietly(tar); + } + } + /** * Runs both SegmentMetadataPushJobRunner and SegmentTarPushJobRunner while enabling consistent data push. * Checks that segments are properly loaded and segment lineage entry were also in expected states. @@ -237,15 +314,15 @@ public class SegmentUploadIntegrationTest extends BaseClusterIntegrationTest { jobSpec.setPinotFSSpecs(Lists.newArrayList(fsSpec)); jobSpec.setOutputDirURI(_tarDir.getAbsolutePath()); TableSpec tableSpec = new TableSpec(); - tableSpec.setTableName(DEFAULT_TABLE_NAME); - tableSpec.setTableConfigURI(_controllerRequestURLBuilder.forUpdateTableConfig(DEFAULT_TABLE_NAME)); + tableSpec.setTableName(getTableName() + "_OFFLINE"); + tableSpec.setTableConfigURI(_controllerRequestURLBuilder.forUpdateTableConfig(getTableName())); jobSpec.setTableSpec(tableSpec); PinotClusterSpec clusterSpec = new PinotClusterSpec(); clusterSpec.setControllerURI(getControllerBaseApiUrl()); jobSpec.setPinotClusterSpecs(new PinotClusterSpec[]{clusterSpec}); File dataDir = new File(_controllerConfig.getDataDir()); - File dataDirSegments = new File(dataDir, DEFAULT_TABLE_NAME); + File dataDirSegments = new File(dataDir, getTableName()); Assert.assertEquals(_tarDir.listFiles().length, 1); @@ -268,7 +345,7 @@ public class SegmentUploadIntegrationTest extends BaseClusterIntegrationTest { // Fetch segment lineage entry after running segment metadata push with consistent push enabled. String segmentLineageResponse = ControllerTest.sendGetRequest( ControllerRequestURLBuilder.baseUrl(getControllerBaseApiUrl()) - .forListAllSegmentLineages(DEFAULT_TABLE_NAME, TableType.OFFLINE.toString())); + .forListAllSegmentLineages(getTableName(), TableType.OFFLINE.toString())); // Segment lineage should be in completed state. Assert.assertTrue(segmentLineageResponse.contains("\"state\":\"COMPLETED\"")); // SegmentsFrom should be empty as we started with a blank table. @@ -317,7 +394,7 @@ public class SegmentUploadIntegrationTest extends BaseClusterIntegrationTest { // Fetch segment lineage entry after running segment tar push with consistent push enabled. segmentLineageResponse = ControllerTest.sendGetRequest( ControllerRequestURLBuilder.baseUrl(getControllerBaseApiUrl()) - .forListAllSegmentLineages(DEFAULT_TABLE_NAME, TableType.OFFLINE.toString())); + .forListAllSegmentLineages(getTableName(), TableType.OFFLINE.toString())); // Segment lineage should be in completed state. Assert.assertTrue(segmentLineageResponse.contains("\"state\":\"COMPLETED\"")); // SegmentsFrom should contain the previous segment @@ -337,14 +414,14 @@ public class SegmentUploadIntegrationTest extends BaseClusterIntegrationTest { private long getNumDocs(String segmentName) throws IOException { return JsonUtils.stringToJsonNode( - sendGetRequest(_controllerRequestURLBuilder.forSegmentMetadata(DEFAULT_TABLE_NAME, segmentName))) + sendGetRequest(_controllerRequestURLBuilder.forSegmentMetadata(getTableName(), segmentName))) .get("segment.total.docs").asLong(); } private JsonNode getSegmentsList() throws IOException { return JsonUtils.stringToJsonNode(sendGetRequest( - _controllerRequestURLBuilder.forSegmentListAPI(DEFAULT_TABLE_NAME, TableType.OFFLINE.toString()))) + _controllerRequestURLBuilder.forSegmentListAPI(getTableName(), TableType.OFFLINE.toString()))) .get(0).get("OFFLINE"); } @@ -362,6 +439,11 @@ public class SegmentUploadIntegrationTest extends BaseClusterIntegrationTest { }, 100L, 300_000, "Failed to load " + countStarResult + " documents", true); } + @Override + public String getTableName() { + return DEFAULT_TABLE_NAME + _tableNameSuffix; + } + @AfterMethod public void tearDownTest() throws IOException { diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java index 8b0963578e..d997f8f0cc 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java @@ -26,6 +26,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -179,14 +180,14 @@ public abstract class BaseMultipleSegmentsConversionExecutor extends BaseTaskExe _pinotTaskConfig = pinotTaskConfig; _eventObserver = MinionEventObservers.getInstance().getMinionEventObserver(pinotTaskConfig.getTaskId()); String taskType = pinotTaskConfig.getTaskType(); - Map<String, String> configs = pinotTaskConfig.getConfigs(); - String tableNameWithType = configs.get(MinionConstants.TABLE_NAME_KEY); - String inputSegmentNames = configs.get(MinionConstants.SEGMENT_NAME_KEY); + Map<String, String> taskConfigs = pinotTaskConfig.getConfigs(); + String tableNameWithType = taskConfigs.get(MinionConstants.TABLE_NAME_KEY); + String inputSegmentNames = taskConfigs.get(MinionConstants.SEGMENT_NAME_KEY); String[] segmentNames = inputSegmentNames.split(MinionConstants.SEGMENT_NAME_SEPARATOR); - String uploadURL = configs.get(MinionConstants.UPLOAD_URL_KEY); - String downloadURLString = configs.get(MinionConstants.DOWNLOAD_URL_KEY); + String uploadURL = taskConfigs.get(MinionConstants.UPLOAD_URL_KEY); + String downloadURLString = taskConfigs.get(MinionConstants.DOWNLOAD_URL_KEY); String[] downloadURLs = downloadURLString.split(MinionConstants.URL_SEPARATOR); - AuthProvider authProvider = AuthProviderUtils.makeAuthProvider(configs.get(MinionConstants.AUTH_TOKEN)); + AuthProvider authProvider = AuthProviderUtils.makeAuthProvider(taskConfigs.get(MinionConstants.AUTH_TOKEN)); LOGGER.info("Start executing {} on table: {}, input segments: {} with downloadURLs: {}, uploadURL: {}", taskType, tableNameWithType, inputSegmentNames, downloadURLString, uploadURL); File tempDataDir = new File(new File(MINION_CONTEXT.getDataDir(), taskType), "tmp-" + UUID.randomUUID()); @@ -274,6 +275,8 @@ public abstract class BaseMultipleSegmentsConversionExecutor extends BaseTaskExe SegmentUploadContext segmentUploadContext = new SegmentUploadContext(pinotTaskConfig, segmentConversionResults); preUploadSegments(segmentUploadContext); + Map<String, String> segmentUriToTarPathMap = new HashMap<>(); + PushJobSpec pushJobSpec = getPushJobSpec(taskConfigs); // Upload the tarred segments for (int i = 0; i < numOutputSegments; i++) { @@ -282,51 +285,60 @@ public abstract class BaseMultipleSegmentsConversionExecutor extends BaseTaskExe String resultSegmentName = segmentConversionResult.getSegmentName(); _eventObserver.notifyProgress(_pinotTaskConfig, String.format("Uploading segment: %s (%d out of %d)", resultSegmentName, (i + 1), numOutputSegments)); - - // Set segment ZK metadata custom map modifier into HTTP header to modify the segment ZK metadata - SegmentZKMetadataCustomMapModifier segmentZKMetadataCustomMapModifier = - getSegmentZKMetadataCustomMapModifier(pinotTaskConfig, segmentConversionResult); - Header segmentZKMetadataCustomMapModifierHeader = - new BasicHeader(FileUploadDownloadClient.CustomHeaders.SEGMENT_ZK_METADATA_CUSTOM_MAP_MODIFIER, - segmentZKMetadataCustomMapModifier.toJsonString()); - - String pushMode = - configs.getOrDefault(BatchConfigProperties.PUSH_MODE, BatchConfigProperties.SegmentPushType.TAR.name()); + String pushMode = taskConfigs.getOrDefault(BatchConfigProperties.PUSH_MODE, + BatchConfigProperties.SegmentPushType.TAR.name()); URI outputSegmentTarURI; if (BatchConfigProperties.SegmentPushType.valueOf(pushMode.toUpperCase()) != BatchConfigProperties.SegmentPushType.TAR) { - outputSegmentTarURI = moveSegmentToOutputPinotFS(configs, convertedTarredSegmentFile); + outputSegmentTarURI = moveSegmentToOutputPinotFS(taskConfigs, convertedTarredSegmentFile); LOGGER.info("Moved generated segment from [{}] to location: [{}]", convertedTarredSegmentFile, outputSegmentTarURI); } else { outputSegmentTarURI = convertedTarredSegmentFile.toURI(); } - List<Header> httpHeaders = new ArrayList<>(); - httpHeaders.add(segmentZKMetadataCustomMapModifierHeader); - httpHeaders.addAll(AuthProviderUtils.toRequestHeaders(authProvider)); - + // Set segment ZK metadata custom map modifier into HTTP header to modify the segment ZK metadata + List<Header> httpHeaders = getSegmentPushCommonHeaders(pinotTaskConfig, authProvider, segmentConversionResults); // Set parameters for upload request - NameValuePair enableParallelPushProtectionParameter = - new BasicNameValuePair(FileUploadDownloadClient.QueryParameters.ENABLE_PARALLEL_PUSH_PROTECTION, "true"); - NameValuePair tableNameParameter = new BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_NAME, - TableNameBuilder.extractRawTableName(tableNameWithType)); - NameValuePair tableTypeParameter = new BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_TYPE, - TableNameBuilder.getTableTypeFromTableName(tableNameWithType).toString()); + List<NameValuePair> parameters = getSegmentPushCommonParams(tableNameWithType); + // RealtimeToOfflineSegmentsTask pushed segments to the corresponding offline table // TODO: This is not clean to put the override here, but let's think about it harder to see what is the proper // way to override it. if (MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE.equals(taskType)) { - tableTypeParameter = new BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_TYPE, - TableType.OFFLINE.toString()); + Iterator<NameValuePair> paramItr = parameters.iterator(); + while (paramItr.hasNext()) { + NameValuePair nameValuePair = paramItr.next(); + if (FileUploadDownloadClient.QueryParameters.TABLE_TYPE.equals(nameValuePair.getName())) { + paramItr.remove(); + parameters.add(new BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_TYPE, + TableType.OFFLINE.toString())); + break; + } + } + } + + if (BatchConfigProperties.SegmentPushType.valueOf(pushMode.toUpperCase()) + == BatchConfigProperties.SegmentPushType.METADATA) { + updateSegmentUriToTarPathMap(taskConfigs, outputSegmentTarURI, segmentConversionResult, + segmentUriToTarPathMap, pushJobSpec); + } else { + pushSegment(taskConfigs, outputSegmentTarURI, httpHeaders, parameters, segmentConversionResult); + if (!FileUtils.deleteQuietly(convertedTarredSegmentFile)) { + LOGGER.warn("Failed to delete tarred converted segment: {}", convertedTarredSegmentFile.getAbsolutePath()); + } } - List<NameValuePair> parameters = Arrays.asList(enableParallelPushProtectionParameter, tableNameParameter, - tableTypeParameter); + } - pushSegment(tableNameParameter.getValue(), configs, outputSegmentTarURI, httpHeaders, parameters, - segmentConversionResult); - if (!FileUtils.deleteQuietly(convertedTarredSegmentFile)) { - LOGGER.warn("Failed to delete tarred converted segment: {}", convertedTarredSegmentFile.getAbsolutePath()); + if (!segmentUriToTarPathMap.isEmpty()) { + // For metadata push, push all segments in batch mode + pushJobSpec.setBatchMode(true); + pushSegments(tableNameWithType, taskConfigs, pinotTaskConfig, segmentUriToTarPathMap, pushJobSpec, authProvider, + segmentConversionResults); + for (File convertedTarredSegmentFile: tarredSegmentFiles) { + if (!FileUtils.deleteQuietly(convertedTarredSegmentFile)) { + LOGGER.warn("Failed to delete tarred converted segment: {}", convertedTarredSegmentFile.getAbsolutePath()); + } } } @@ -335,9 +347,8 @@ public abstract class BaseMultipleSegmentsConversionExecutor extends BaseTaskExe String outputSegmentNames = segmentConversionResults.stream().map(SegmentConversionResult::getSegmentName) .collect(Collectors.joining(",")); postProcess(pinotTaskConfig); - LOGGER - .info("Done executing {} on table: {}, input segments: {}, output segments: {}", taskType, tableNameWithType, - inputSegmentNames, outputSegmentNames); + LOGGER.info("Done executing {} on table: {}, input segments: {}, output segments: {}", taskType, + tableNameWithType, inputSegmentNames, outputSegmentNames); return segmentConversionResults; } finally { @@ -345,50 +356,107 @@ public abstract class BaseMultipleSegmentsConversionExecutor extends BaseTaskExe } } - private void pushSegment(String tableName, Map<String, String> taskConfigs, URI outputSegmentTarURI, - List<Header> headers, List<NameValuePair> parameters, SegmentConversionResult segmentConversionResult) - throws Exception { - String pushMode = - taskConfigs.getOrDefault(BatchConfigProperties.PUSH_MODE, BatchConfigProperties.SegmentPushType.TAR.name()); - LOGGER.info("Trying to push Pinot segment with push mode {} from {}", pushMode, outputSegmentTarURI); + private void updateSegmentUriToTarPathMap(Map<String, String> taskConfigs, URI outputSegmentTarURI, + SegmentConversionResult segmentConversionResult, Map<String, String> segmentUriToTarPathMap, + PushJobSpec pushJobSpec) { + String segmentName = segmentConversionResult.getSegmentName(); + if (!taskConfigs.containsKey(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI)) { + throw new RuntimeException(String.format("Output dir URI missing for metadata push while processing segment: %s", + segmentName)); + } + URI outputSegmentDirURI = URI.create(taskConfigs.get(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI)); + Map<String, String> localSegmentUriToTarPathMap = + SegmentPushUtils.getSegmentUriToTarPathMap(outputSegmentDirURI, pushJobSpec, + new String[]{outputSegmentTarURI.toString()}); + if (!localSegmentUriToTarPathMap.isEmpty()) { + segmentUriToTarPathMap.putAll(localSegmentUriToTarPathMap); + } + } + private PushJobSpec getPushJobSpec(Map<String, String> taskConfigs) { PushJobSpec pushJobSpec = new PushJobSpec(); pushJobSpec.setPushAttempts(DEFUALT_PUSH_ATTEMPTS); pushJobSpec.setPushParallelism(DEFAULT_PUSH_PARALLELISM); pushJobSpec.setPushRetryIntervalMillis(DEFAULT_PUSH_RETRY_INTERVAL_MILLIS); pushJobSpec.setSegmentUriPrefix(taskConfigs.get(BatchConfigProperties.PUSH_SEGMENT_URI_PREFIX)); pushJobSpec.setSegmentUriSuffix(taskConfigs.get(BatchConfigProperties.PUSH_SEGMENT_URI_SUFFIX)); + return pushJobSpec; + } - SegmentGenerationJobSpec spec = generatePushJobSpec(tableName, taskConfigs, pushJobSpec); - - switch (BatchConfigProperties.SegmentPushType.valueOf(pushMode.toUpperCase())) { - case TAR: - File tarFile = new File(outputSegmentTarURI); - String segmentName = segmentConversionResult.getSegmentName(); - String tableNameWithType = segmentConversionResult.getTableNameWithType(); - String uploadURL = taskConfigs.get(MinionConstants.UPLOAD_URL_KEY); - SegmentConversionUtils.uploadSegment(taskConfigs, headers, parameters, tableNameWithType, segmentName, - uploadURL, tarFile); - break; - case METADATA: - if (taskConfigs.containsKey(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI)) { - URI outputSegmentDirURI = URI.create(taskConfigs.get(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI)); - try (PinotFS outputFileFS = MinionTaskUtils.getOutputPinotFS(taskConfigs, outputSegmentDirURI)) { - Map<String, String> segmentUriToTarPathMap = - SegmentPushUtils.getSegmentUriToTarPathMap(outputSegmentDirURI, pushJobSpec, - new String[]{outputSegmentTarURI.toString()}); - SegmentPushUtils.sendSegmentUriAndMetadata(spec, outputFileFS, segmentUriToTarPathMap, headers, parameters); - } - } else { - throw new RuntimeException("Output dir URI missing for metadata push"); - } - break; - default: - throw new UnsupportedOperationException("Unrecognized push mode - " + pushMode); + private List<Header> getSegmentPushCommonHeaders(PinotTaskConfig pinotTaskConfig, AuthProvider authProvider, + List<SegmentConversionResult> segmentConversionResults) { + SegmentConversionResult segmentConversionResult; + if (segmentConversionResults.size() == 1) { + segmentConversionResult = segmentConversionResults.get(0); + } else { + // Setting to null as the base method expects a single object. This is ok for now, since the + // segmentConversionResult is not made use of while generating the customMap. + segmentConversionResult = null; + } + SegmentZKMetadataCustomMapModifier segmentZKMetadataCustomMapModifier = + getSegmentZKMetadataCustomMapModifier(pinotTaskConfig, segmentConversionResult); + Header segmentZKMetadataCustomMapModifierHeader = + new BasicHeader(FileUploadDownloadClient.CustomHeaders.SEGMENT_ZK_METADATA_CUSTOM_MAP_MODIFIER, + segmentZKMetadataCustomMapModifier.toJsonString()); + + List<Header> headers = new ArrayList<>(); + headers.add(segmentZKMetadataCustomMapModifierHeader); + headers.addAll(AuthProviderUtils.toRequestHeaders(authProvider)); + return headers; + } + + private List<NameValuePair> getSegmentPushCommonParams(String tableNameWithType) { + List<NameValuePair> params = new ArrayList<>(); + params.add(new BasicNameValuePair(FileUploadDownloadClient.QueryParameters.ENABLE_PARALLEL_PUSH_PROTECTION, + "true")); + params.add(new BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_NAME, + TableNameBuilder.extractRawTableName(tableNameWithType))); + TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType); + if (tableType != null) { + params.add(new BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_TYPE, tableType.toString())); + } else { + throw new RuntimeException(String.format("Failed to determine the tableType from name: %s", tableNameWithType)); + } + return params; + } + + private void pushSegments(String tableNameWithType, Map<String, String> taskConfigs, PinotTaskConfig pinotTaskConfig, + Map<String, String> segmentUriToTarPathMap, PushJobSpec pushJobSpec, + AuthProvider authProvider, List<SegmentConversionResult> segmentConversionResults) + throws Exception { + String tableName = TableNameBuilder.extractRawTableName(tableNameWithType); + SegmentGenerationJobSpec spec = generateSegmentGenerationJobSpec(tableName, taskConfigs, pushJobSpec); + + List<Header> headers = getSegmentPushCommonHeaders(pinotTaskConfig, authProvider, segmentConversionResults); + List<NameValuePair> parameters = getSegmentPushCommonParams(tableNameWithType); + + URI outputSegmentDirURI = URI.create(taskConfigs.get(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI)); + try (PinotFS outputFileFS = MinionTaskUtils.getOutputPinotFS(taskConfigs, outputSegmentDirURI)) { + SegmentPushUtils.sendSegmentsUriAndMetadata(spec, outputFileFS, segmentUriToTarPathMap, headers, parameters); + } + } + + private void pushSegment(Map<String, String> taskConfigs, URI outputSegmentTarURI, + List<Header> headers, List<NameValuePair> parameters, SegmentConversionResult segmentConversionResult) + throws Exception { + String pushMode = taskConfigs.getOrDefault(BatchConfigProperties.PUSH_MODE, + BatchConfigProperties.SegmentPushType.TAR.name()); + LOGGER.info("Trying to push Pinot segment with push mode {} from {}", pushMode, outputSegmentTarURI); + + if (BatchConfigProperties.SegmentPushType.valueOf(pushMode.toUpperCase()) + == BatchConfigProperties.SegmentPushType.TAR) { + File tarFile = new File(outputSegmentTarURI); + String segmentName = segmentConversionResult.getSegmentName(); + String tableNameWithType = segmentConversionResult.getTableNameWithType(); + String uploadURL = taskConfigs.get(MinionConstants.UPLOAD_URL_KEY); + SegmentConversionUtils.uploadSegment(taskConfigs, headers, parameters, tableNameWithType, segmentName, uploadURL, + tarFile); + } else { + throw new UnsupportedOperationException("Unrecognized push mode: " + pushMode); } } - private SegmentGenerationJobSpec generatePushJobSpec(String tableName, Map<String, String> taskConfigs, + private SegmentGenerationJobSpec generateSegmentGenerationJobSpec(String tableName, Map<String, String> taskConfigs, PushJobSpec pushJobSpec) { TableSpec tableSpec = new TableSpec(); @@ -416,7 +484,7 @@ public abstract class BaseMultipleSegmentsConversionExecutor extends BaseTaskExe if (!Boolean.parseBoolean(taskConfigs.get(BatchConfigProperties.OVERWRITE_OUTPUT)) && outputFileFS.exists( outputSegmentTarURI)) { throw new RuntimeException(String.format("Output file: %s already exists. " - + "Set 'overwriteOutput' to true to ignore this error", outputSegmentTarURI)); + + "Set 'overwriteOutput' to true to ignore this error", outputSegmentTarURI)); } else { outputFileFS.copyFromLocalFile(localSegmentTarFile, outputSegmentTarURI); } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPushUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPushUtils.java index 4a5dd21948..cc3f008771 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPushUtils.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPushUtils.java @@ -359,6 +359,136 @@ public class SegmentPushUtils implements Serializable { } } + public static void sendSegmentsUriAndMetadata(SegmentGenerationJobSpec spec, PinotFS fileSystem, + Map<String, String> segmentUriToTarPathMap, List<Header> headers, List<NameValuePair> parameters) + throws Exception { + String tableName = spec.getTableSpec().getTableName(); + Map<String, File> segmentMetadataFileMap = new HashMap<>(); + List<String> segmentURIs = new ArrayList<>(); + LOGGER.info("Start pushing segment metadata: {} to locations: {} for table {}", segmentUriToTarPathMap, + Arrays.toString(spec.getPinotClusterSpecs()), tableName); + for (String segmentUriPath : segmentUriToTarPathMap.keySet()) { + String tarFilePath = segmentUriToTarPathMap.get(segmentUriPath); + String fileName = new File(tarFilePath).getName(); + // segments stored in Pinot deep store do not have .tar.gz extension + String segmentName = fileName.endsWith(Constants.TAR_GZ_FILE_EXT) + ? fileName.substring(0, fileName.length() - Constants.TAR_GZ_FILE_EXT.length()) : fileName; + SegmentNameUtils.validatePartialOrFullSegmentName(segmentName); + File segmentMetadataFile; + // Check if there is a segment metadata tar gz file named `segmentName.metadata.tar.gz`, already in the remote + // directory. This is to avoid generating a new segment metadata tar gz file every time we push a segment, + // which requires downloading the entire segment tar gz file. + + URI metadataTarGzFilePath = generateSegmentMetadataURI(tarFilePath, segmentName); + LOGGER.info("Checking if metadata tar gz file {} exists", metadataTarGzFilePath); + if (spec.getPushJobSpec().isPreferMetadataTarGz() && fileSystem.exists(metadataTarGzFilePath)) { + segmentMetadataFile = new File(FileUtils.getTempDirectory(), + "segmentMetadata-" + UUID.randomUUID() + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION); + if (segmentMetadataFile.exists()) { + FileUtils.forceDelete(segmentMetadataFile); + } + fileSystem.copyToLocalFile(metadataTarGzFilePath, segmentMetadataFile); + } else { + segmentMetadataFile = generateSegmentMetadataFile(fileSystem, URI.create(tarFilePath)); + } + segmentMetadataFileMap.put(segmentName, segmentMetadataFile); + segmentURIs.add(segmentName); + segmentURIs.add(segmentUriPath); + } + + // FIXME: Move this to a separate method + String uuid = UUID.randomUUID().toString(); + File allSegmentsMetadataDir = new File(FileUtils.getTempDirectory(), "allSegmentsMetadataDir-" + uuid); + FileUtils.forceMkdir(allSegmentsMetadataDir); + for (Map.Entry<String, File> segmentMetadataTarFileEntry: segmentMetadataFileMap.entrySet()) { + String segmentName = segmentMetadataTarFileEntry.getKey(); + File tarFile = segmentMetadataTarFileEntry.getValue(); + TarGzCompressionUtils.untarOneFile(tarFile, V1Constants.MetadataKeys.METADATA_FILE_NAME, + new File(allSegmentsMetadataDir, segmentName + "." + V1Constants.MetadataKeys.METADATA_FILE_NAME)); + TarGzCompressionUtils.untarOneFile(tarFile, V1Constants.SEGMENT_CREATION_META, + new File(allSegmentsMetadataDir, segmentName + "." + V1Constants.SEGMENT_CREATION_META)); + } + File allSegmentsMetadataTarFile = new File(FileUtils.getTempDirectory(), "allSegmentsMetadataTar-" + uuid + + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION); + if (allSegmentsMetadataTarFile.exists()) { + FileUtils.forceDelete(allSegmentsMetadataTarFile); + } + // Add a file which contains the download URI of all the segments + File segmentsURIFile = new File(allSegmentsMetadataDir, "all_segments_metadata"); + FileUtils.writeLines(segmentsURIFile, segmentURIs); + + TarGzCompressionUtils.createTarGzFile(allSegmentsMetadataDir, allSegmentsMetadataTarFile); + Map<String, File> allSegmentsMetadataMap = new HashMap<>(); + allSegmentsMetadataMap.put("allSegments", allSegmentsMetadataTarFile); + + // perform metadata push in batch mode for every cluster + try { + for (PinotClusterSpec pinotClusterSpec : spec.getPinotClusterSpecs()) { + URI controllerURI; + try { + controllerURI = new URI(pinotClusterSpec.getControllerURI()); + } catch (URISyntaxException e) { + throw new RuntimeException("Got invalid controller uri - '" + pinotClusterSpec.getControllerURI() + "'"); + } + LOGGER.info("Pushing segments: {} to Pinot cluster: {} for table {}", + segmentMetadataFileMap.keySet(), controllerURI, tableName); + int attempts = 1; + if (spec.getPushJobSpec() != null && spec.getPushJobSpec().getPushAttempts() > 0) { + attempts = spec.getPushJobSpec().getPushAttempts(); + } + long retryWaitMs = 1000L; + if (spec.getPushJobSpec() != null && spec.getPushJobSpec().getPushRetryIntervalMillis() > 0) { + retryWaitMs = spec.getPushJobSpec().getPushRetryIntervalMillis(); + } + RetryPolicies.exponentialBackoffRetryPolicy(attempts, retryWaitMs, 5).attempt(() -> { + List<Header> reqHttpHeaders = new ArrayList<>(headers); + try { + addHeaders(spec, reqHttpHeaders); + URI segmentUploadURI = getSegmentUploadURI(controllerURI); + SimpleHttpResponse response = FILE_UPLOAD_DOWNLOAD_CLIENT.uploadSegmentMetadataFiles(segmentUploadURI, + allSegmentsMetadataMap, reqHttpHeaders, parameters, HttpClient.DEFAULT_SOCKET_TIMEOUT_MS); + LOGGER.info("Response for pushing table {} segments {} to location {} - {}: {}", tableName, + segmentMetadataFileMap.keySet(), controllerURI, response.getStatusCode(), response.getResponse()); + return true; + } catch (HttpErrorStatusException e) { + int statusCode = e.getStatusCode(); + if (statusCode >= 500) { + // Temporary exception + LOGGER.warn("Caught temporary exception while pushing table: {} segments: {} to {}, will retry", + tableName, segmentMetadataFileMap.keySet(), controllerURI, e); + return false; + } else { + // Permanent exception + LOGGER.error("Caught permanent exception while pushing table: {} segments: {} to {}, won't retry", + tableName, segmentMetadataFileMap.keySet(), controllerURI, e); + throw e; + } + } + }); + } + } finally { + for (Map.Entry<String, File> metadataFileEntry: segmentMetadataFileMap.entrySet()) { + FileUtils.deleteQuietly(metadataFileEntry.getValue()); + } + FileUtils.deleteDirectory(allSegmentsMetadataDir); + FileUtils.forceDelete(allSegmentsMetadataTarFile); + } + } + + private static URI getSegmentUploadURI(URI controllerURI) + throws URISyntaxException { + return FileUploadDownloadClient.getUploadSegmentBatchURI(controllerURI); + } + + private static void addHeaders(SegmentGenerationJobSpec jobSpec, List<Header> headers) { + headers.add(new BasicHeader(FileUploadDownloadClient.CustomHeaders.UPLOAD_TYPE, + FileUploadDownloadClient.FileUploadType.METADATA.toString())); + if (jobSpec.getPushJobSpec() != null) { + headers.add(new BasicHeader(FileUploadDownloadClient.CustomHeaders.COPY_SEGMENT_TO_DEEP_STORE, + String.valueOf(jobSpec.getPushJobSpec().getCopyToDeepStoreForMetadataPush()))); + } + } + public static Map<String, String> getSegmentUriToTarPathMap(URI outputDirURI, PushJobSpec pushSpec, String[] files) { Map<String, String> segmentUriToTarPathMap = new HashMap<>(); diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/PushJobSpec.java b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/PushJobSpec.java index 31d1ce8448..2b9237e5fd 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/PushJobSpec.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/PushJobSpec.java @@ -46,6 +46,13 @@ public class PushJobSpec implements Serializable { * If true, and if segment was not already in the deep store, move it to deep store. */ private boolean _copyToDeepStoreForMetadataPush; + + /** + * Applicable for METADATA push type. + * If true, multiple segment metadata files are uploaded to the controller in a single call. + */ + private boolean _batchMode; + /** * Used in SegmentUriPushJobRunner, which is used to composite the segment uri to send to pinot controller. * The URI sends to controller is in the format ${segmentUriPrefix}${segmentPath}${segmentUriSuffix} @@ -148,4 +155,12 @@ public class PushJobSpec implements Serializable { public void setCopyToDeepStoreForMetadataPush(boolean copyToDeepStoreForMetadataPush) { _copyToDeepStoreForMetadataPush = copyToDeepStoreForMetadataPush; } + + public boolean isBatchMode() { + return _batchMode; + } + + public void setBatchMode(boolean batchMode) { + _batchMode = batchMode; + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org