This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 159afb7820 Cleanup segment upload logic and allow validation on real-time table (#8695) 159afb7820 is described below commit 159afb7820c30f5eadd75a973ccdc1d99f4e2539 Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com> AuthorDate: Mon May 16 11:14:34 2022 -0700 Cleanup segment upload logic and allow validation on real-time table (#8695) - Cleanup the segment upload logic and add more checks on the parameters - Refactor the segment validator to a util class to make it work on both metadata push and real-time table - Refine the return code documentation of the segment upload API --- .../org/apache/pinot/common/utils/URIUtils.java | 14 +- .../PinotSegmentUploadDownloadRestletResource.java | 262 +++++++++++---------- .../api/upload/SegmentValidationUtils.java | 94 ++++++++ .../controller/api/upload/SegmentValidator.java | 122 ---------- .../pinot/controller/api/upload/ZKOperator.java | 116 ++++----- .../helix/core/PinotHelixResourceManager.java | 12 - .../realtime/PinotLLCRealtimeSegmentManager.java | 16 +- .../helix/core/util/ZKMetadataUtils.java | 12 +- .../controller/api/upload/ZKOperatorTest.java | 40 ++-- .../helix/core/realtime/SegmentCompletionTest.java | 10 +- 10 files changed, 333 insertions(+), 365 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/URIUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/URIUtils.java index b2617a3972..042427b772 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/URIUtils.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/URIUtils.java @@ -34,11 +34,9 @@ public class URIUtils { } /** - * Returns the URI for the given base path and optional parts, appends the local (file) scheme to the URI if no - * scheme exists. All the parts will be appended to the base path with the file separator. + * Returns the URI for the given path, appends the local (file) scheme to the URI if no scheme exists. */ - public static URI getUri(String basePath, String... parts) { - String path = getPath(basePath, parts); + public static URI getUri(String path) { try { URI uri = new URI(path); if (uri.getScheme() != null) { @@ -51,6 +49,14 @@ public class URIUtils { } } + /** + * Returns the URI for the given base path and optional parts, appends the local (file) scheme to the URI if no + * scheme exists. All the parts will be appended to the base path with the file separator. + */ + public static URI getUri(String basePath, String... parts) { + return getUri(getPath(basePath, parts)); + } + /** * Returns the path for the given base path and optional parts. All the parts will be appended to the base path with * the file separator. diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java index 16f2bf1865..6739bb9cc5 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java @@ -18,8 +18,8 @@ */ package org.apache.pinot.controller.api.resources; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import com.google.common.base.Strings; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiParam; @@ -59,6 +59,7 @@ import javax.ws.rs.core.StreamingOutput; import org.apache.commons.httpclient.HttpConnectionManager; import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; +import org.apache.commons.lang.StringUtils; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; import org.apache.pinot.common.metrics.ControllerMeter; @@ -74,12 +75,13 @@ import org.apache.pinot.controller.api.access.AccessControlFactory; import org.apache.pinot.controller.api.access.AccessType; import org.apache.pinot.controller.api.access.Authenticate; import org.apache.pinot.controller.api.exception.ControllerApplicationException; -import org.apache.pinot.controller.api.upload.SegmentValidator; +import org.apache.pinot.controller.api.upload.SegmentValidationUtils; import org.apache.pinot.controller.api.upload.ZKOperator; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; import org.apache.pinot.core.metadata.DefaultMetadataExtractor; import org.apache.pinot.core.metadata.MetadataExtractorFactory; import org.apache.pinot.segment.spi.SegmentMetadata; +import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.crypt.PinotCrypter; import org.apache.pinot.spi.crypt.PinotCrypterFactory; @@ -186,50 +188,78 @@ public class PinotSegmentUploadDownloadRestletResource { return builder.build(); } - private SuccessResponse uploadSegment(@Nullable String tableName, TableType tableType, FormDataMultiPart multiPart, - boolean enableParallelPushProtection, HttpHeaders headers, Request request, boolean moveSegmentToFinalLocation, - boolean allowRefresh) { - String uploadTypeStr = null; - String crypterClassNameInHeader = null; - String downloadUri = null; - String ingestionDescriptor = null; - if (headers != null) { - extractHttpHeader(headers, CommonConstants.Controller.SEGMENT_NAME_HTTP_HEADER); - extractHttpHeader(headers, CommonConstants.Controller.TABLE_NAME_HTTP_HEADER); - ingestionDescriptor = extractHttpHeader(headers, CommonConstants.Controller.INGESTION_DESCRIPTOR); - uploadTypeStr = extractHttpHeader(headers, FileUploadDownloadClient.CustomHeaders.UPLOAD_TYPE); - crypterClassNameInHeader = extractHttpHeader(headers, FileUploadDownloadClient.CustomHeaders.CRYPTER); - downloadUri = extractHttpHeader(headers, FileUploadDownloadClient.CustomHeaders.DOWNLOAD_URI); + private SuccessResponse uploadSegment(@Nullable String tableName, TableType tableType, + @Nullable FormDataMultiPart multiPart, boolean moveSegmentToFinalLocation, boolean enableParallelPushProtection, + boolean allowRefresh, HttpHeaders headers, Request request) { + if (StringUtils.isNotEmpty(tableName)) { + TableType tableTypeFromTableName = TableNameBuilder.getTableTypeFromTableName(tableName); + if (tableTypeFromTableName != null && tableTypeFromTableName != tableType) { + throw new ControllerApplicationException(LOGGER, + String.format("Table name: %s does not match table type: %s", tableName, tableType), + Response.Status.BAD_REQUEST); + } } + + // TODO: Consider validating the segment name and table name from the header against the actual segment + extractHttpHeader(headers, CommonConstants.Controller.SEGMENT_NAME_HTTP_HEADER); + extractHttpHeader(headers, CommonConstants.Controller.TABLE_NAME_HTTP_HEADER); + + String uploadTypeStr = extractHttpHeader(headers, FileUploadDownloadClient.CustomHeaders.UPLOAD_TYPE); + String downloadURI = extractHttpHeader(headers, FileUploadDownloadClient.CustomHeaders.DOWNLOAD_URI); + String crypterClassNameInHeader = extractHttpHeader(headers, FileUploadDownloadClient.CustomHeaders.CRYPTER); + String ingestionDescriptor = extractHttpHeader(headers, CommonConstants.Controller.INGESTION_DESCRIPTOR); + File tempEncryptedFile = null; File tempDecryptedFile = null; File tempSegmentDir = null; try { ControllerFilePathProvider provider = ControllerFilePathProvider.getInstance(); String tempFileName = TMP_DIR_PREFIX + UUID.randomUUID(); - tempDecryptedFile = new File(provider.getFileUploadTempDir(), tempFileName); tempEncryptedFile = new File(provider.getFileUploadTempDir(), tempFileName + ENCRYPTED_SUFFIX); + tempDecryptedFile = new File(provider.getFileUploadTempDir(), tempFileName); tempSegmentDir = new File(provider.getUntarredFileTempDir(), tempFileName); - boolean uploadedSegmentIsEncrypted = !Strings.isNullOrEmpty(crypterClassNameInHeader); + boolean uploadedSegmentIsEncrypted = StringUtils.isNotEmpty(crypterClassNameInHeader); FileUploadDownloadClient.FileUploadType uploadType = getUploadType(uploadTypeStr); - File dstFile = uploadedSegmentIsEncrypted ? tempEncryptedFile : tempDecryptedFile; + File destFile = uploadedSegmentIsEncrypted ? tempEncryptedFile : tempDecryptedFile; long segmentSizeInBytes; switch (uploadType) { - case URI: - downloadSegmentFileFromURI(downloadUri, dstFile, tableName); - segmentSizeInBytes = dstFile.length(); - break; case SEGMENT: - createSegmentFileFromMultipart(multiPart, dstFile); - segmentSizeInBytes = dstFile.length(); + if (multiPart == null) { + throw new ControllerApplicationException(LOGGER, + "Segment file (as multipart/form-data) is required for SEGMENT upload mode", + Response.Status.BAD_REQUEST); + } + if (!moveSegmentToFinalLocation && StringUtils.isEmpty(downloadURI)) { + throw new ControllerApplicationException(LOGGER, + "Download URI is required if segment should not be copied to the deep store", + Response.Status.BAD_REQUEST); + } + createSegmentFileFromMultipart(multiPart, destFile); + segmentSizeInBytes = destFile.length(); + break; + case URI: + if (StringUtils.isEmpty(downloadURI)) { + throw new ControllerApplicationException(LOGGER, "Download URI is required for URI upload mode", + Response.Status.BAD_REQUEST); + } + downloadSegmentFileFromURI(downloadURI, destFile, tableName); + segmentSizeInBytes = destFile.length(); break; case METADATA: + if (multiPart == null) { + throw new ControllerApplicationException(LOGGER, + "Segment metadata file (as multipart/form-data) is required for METADATA upload mode", + Response.Status.BAD_REQUEST); + } + if (StringUtils.isEmpty(downloadURI)) { + throw new ControllerApplicationException(LOGGER, "Download URI is required for METADATA upload mode", + Response.Status.BAD_REQUEST); + } moveSegmentToFinalLocation = false; - Preconditions.checkState(downloadUri != null, "Download URI is required in segment metadata upload mode"); - createSegmentFileFromMultipart(multiPart, dstFile); + createSegmentFileFromMultipart(multiPart, destFile); try { - URI segmentURI = new URI(downloadUri); + URI segmentURI = new URI(downloadURI); PinotFS pinotFS = PinotFSFactory.create(segmentURI.getScheme()); segmentSizeInBytes = pinotFS.length(segmentURI); } catch (Exception e) { @@ -238,7 +268,8 @@ public class PinotSegmentUploadDownloadRestletResource { } break; default: - throw new UnsupportedOperationException("Unsupported upload type: " + uploadType); + throw new ControllerApplicationException(LOGGER, "Unsupported upload type: " + uploadType, + Response.Status.BAD_REQUEST); } if (uploadedSegmentIsEncrypted) { @@ -253,67 +284,70 @@ public class PinotSegmentUploadDownloadRestletResource { // Fetch table name. Try to derive the table name from the parameter and then from segment metadata String rawTableName; - if (tableName != null && !tableName.isEmpty()) { + if (StringUtils.isNotEmpty(tableName)) { rawTableName = TableNameBuilder.extractRawTableName(tableName); - LOGGER.info("Uploading a segment {} to table: {}, push type {}, (Derived from API parameter)", segmentName, - tableName, uploadType); } else { // TODO: remove this when we completely deprecate the table name from segment metadata rawTableName = segmentMetadata.getTableName(); - LOGGER.info("Uploading a segment {} to table: {}, push type {}, (Derived from segment metadata)", segmentName, - tableName, uploadType); + LOGGER.warn("Table name is not provided when uploading segment: {} for table: {}", segmentName, rawTableName); } - String tableNameWithType; if (tableType == TableType.OFFLINE) { tableNameWithType = TableNameBuilder.OFFLINE.tableNameWithType(rawTableName); } else { - if (!_pinotHelixResourceManager.isUpsertTable(rawTableName)) { - throw new UnsupportedOperationException( - "Upload segment to non-upsert realtime table is not supported " + rawTableName); - } tableNameWithType = TableNameBuilder.REALTIME.tableNameWithType(rawTableName); + if (!_pinotHelixResourceManager.isUpsertTable(tableNameWithType)) { + throw new ControllerApplicationException(LOGGER, + "Cannot upload segment to non-upsert real-time table: " + tableNameWithType, Response.Status.FORBIDDEN); + } } String clientAddress = InetAddress.getByName(request.getRemoteAddr()).getHostName(); - LOGGER.info("Processing upload request for segment: {} of table: {} from client: {}, ingestion descriptor: {}", - segmentName, tableNameWithType, clientAddress, ingestionDescriptor); - - // Skip segment validation if upload is to an offline table and only segment metadata. Skip segment validation for - // realtime tables because the feature is experimental and only applicable to upsert enabled table currently. - if (tableType == TableType.OFFLINE && uploadType != FileUploadDownloadClient.FileUploadType.METADATA) { - // Validate segment - new SegmentValidator(_pinotHelixResourceManager, _controllerConf, _executor, _connectionManager, - _controllerMetrics, _leadControllerManager.isLeaderForTable(tableNameWithType)) - .validateOfflineSegment(tableNameWithType, segmentMetadata, tempSegmentDir); + LOGGER.info("Processing upload request for segment: {} of table: {} with upload type: {} from client: {}, " + + "ingestion descriptor: {}", segmentName, tableNameWithType, uploadType, clientAddress, ingestionDescriptor); + + // Validate segment + TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType); + if (tableConfig == null) { + throw new ControllerApplicationException(LOGGER, "Failed to find table: " + tableNameWithType, + Response.Status.BAD_REQUEST); + } + SegmentValidationUtils.validateTimeInterval(segmentMetadata, tableConfig); + if (uploadType != FileUploadDownloadClient.FileUploadType.METADATA) { + SegmentValidationUtils.checkStorageQuota(tempSegmentDir, segmentMetadata, tableConfig, + _pinotHelixResourceManager, _controllerConf, _controllerMetrics, _connectionManager, _executor, + _leadControllerManager.isLeaderForTable(tableNameWithType)); } // Encrypt segment - String crypterClassNameInTableConfig = - _pinotHelixResourceManager.getCrypterClassNameFromTableConfig(tableNameWithType); + String crypterNameInTableConfig = tableConfig.getValidationConfig().getCrypterClassName(); Pair<String, File> encryptionInfo = encryptSegmentIfNeeded(tempDecryptedFile, tempEncryptedFile, uploadedSegmentIsEncrypted, - crypterClassNameInHeader, crypterClassNameInTableConfig, segmentName, tableNameWithType); - - String crypterClassName = encryptionInfo.getLeft(); - File finalSegmentFile = encryptionInfo.getRight(); - - // ZK download URI - String zkDownloadUri; - // This boolean is here for V1 segment upload, where we keep the segment in the downloadURI sent in the header. - // We will deprecate this behavior eventually. - if (!moveSegmentToFinalLocation) { - LOGGER - .info("Setting zkDownloadUri: to {} for segment: {} of table: {}, skipping move", downloadUri, segmentName, - tableNameWithType); - zkDownloadUri = downloadUri; - } else { - zkDownloadUri = getZkDownloadURIForSegmentUpload(rawTableName, segmentName); + crypterClassNameInHeader, crypterNameInTableConfig, segmentName, tableNameWithType); + + String crypterName = encryptionInfo.getLeft(); + File segmentFile = encryptionInfo.getRight(); + + // Update download URI if controller is responsible for moving the segment to the deep store + URI finalSegmentLocationURI = null; + if (moveSegmentToFinalLocation) { + URI dataDirURI = provider.getDataDirURI(); + String dataDirPath = dataDirURI.toString(); + String encodedSegmentName = URIUtils.encode(segmentName); + String finalSegmentLocationPath = URIUtils.getPath(dataDirPath, rawTableName, encodedSegmentName); + if (dataDirURI.getScheme().equalsIgnoreCase(CommonConstants.Segment.LOCAL_SEGMENT_SCHEME)) { + downloadURI = URIUtils.getPath(provider.getVip(), "segments", rawTableName, encodedSegmentName); + } else { + downloadURI = finalSegmentLocationPath; + } + finalSegmentLocationURI = URIUtils.getUri(finalSegmentLocationPath); } + LOGGER.info("Using download URI: {} for segment: {} of table: {} (move segment: {})", downloadURI, segmentFile, + tableNameWithType, moveSegmentToFinalLocation); - // Zk operations - completeZkOperations(enableParallelPushProtection, headers, finalSegmentFile, tableNameWithType, segmentMetadata, - segmentName, zkDownloadUri, moveSegmentToFinalLocation, crypterClassName, allowRefresh, segmentSizeInBytes); + ZKOperator zkOperator = new ZKOperator(_pinotHelixResourceManager, _controllerConf, _controllerMetrics); + zkOperator.completeSegmentOperations(tableNameWithType, segmentMetadata, finalSegmentLocationURI, segmentFile, + downloadURI, crypterName, segmentSizeInBytes, enableParallelPushProtection, allowRefresh, headers); return new SuccessResponse("Successfully uploaded segment: " + segmentName + " of table: " + tableNameWithType); } catch (WebApplicationException e) { @@ -338,16 +372,17 @@ public class PinotSegmentUploadDownloadRestletResource { return value; } + @VisibleForTesting Pair<String, File> encryptSegmentIfNeeded(File tempDecryptedFile, File tempEncryptedFile, boolean isUploadedSegmentEncrypted, String crypterUsedInUploadedSegment, String crypterClassNameInTableConfig, String segmentName, String tableNameWithType) { - boolean segmentNeedsEncryption = !Strings.isNullOrEmpty(crypterClassNameInTableConfig); + boolean segmentNeedsEncryption = StringUtils.isNotEmpty(crypterClassNameInTableConfig); // form the output File finalSegmentFile = (isUploadedSegmentEncrypted || segmentNeedsEncryption) ? tempEncryptedFile : tempDecryptedFile; - String crypterClassName = Strings.isNullOrEmpty(crypterClassNameInTableConfig) ? crypterUsedInUploadedSegment + String crypterClassName = StringUtils.isEmpty(crypterClassNameInTableConfig) ? crypterUsedInUploadedSegment : crypterClassNameInTableConfig; ImmutablePair<String, File> out = ImmutablePair.of(crypterClassName, finalSegmentFile); @@ -371,19 +406,6 @@ public class PinotSegmentUploadDownloadRestletResource { return out; } - private String getZkDownloadURIForSegmentUpload(String rawTableName, String segmentName) { - ControllerFilePathProvider provider = ControllerFilePathProvider.getInstance(); - URI dataDirURI = provider.getDataDirURI(); - if (dataDirURI.getScheme().equalsIgnoreCase(CommonConstants.Segment.LOCAL_SEGMENT_SCHEME)) { - return URIUtils.constructDownloadUrl(provider.getVip(), rawTableName, segmentName); - } else { - // Receiving .tar.gz segment upload for pluggable storage. Download URI is the same as final segment location. - String downloadUri = URIUtils.getPath(dataDirURI.toString(), rawTableName, URIUtils.encode(segmentName)); - LOGGER.info("Using download uri: {} for segment: {} of table {}", downloadUri, segmentName, rawTableName); - return downloadUri; - } - } - private void downloadSegmentFileFromURI(String currentSegmentLocationURI, File destFile, String tableName) throws Exception { if (currentSegmentLocationURI == null || currentSegmentLocationURI.isEmpty()) { @@ -406,19 +428,6 @@ public class PinotSegmentUploadDownloadRestletResource { return MetadataExtractorFactory.create(metadataProviderClass).extractMetadata(tempDecryptedFile, tempSegmentDir); } - private void completeZkOperations(boolean enableParallelPushProtection, HttpHeaders headers, File uploadedSegmentFile, - String tableNameWithType, SegmentMetadata segmentMetadata, String segmentName, String zkDownloadURI, - boolean moveSegmentToFinalLocation, String crypter, boolean allowRefresh, long segmentSizeInBytes) - throws Exception { - String basePath = ControllerFilePathProvider.getInstance().getDataDirURI().toString(); - String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType); - URI finalSegmentLocationURI = URIUtils.getUri(basePath, rawTableName, URIUtils.encode(segmentName)); - ZKOperator zkOperator = new ZKOperator(_pinotHelixResourceManager, _controllerConf, _controllerMetrics); - zkOperator.completeSegmentOperations(tableNameWithType, segmentMetadata, finalSegmentLocationURI, - uploadedSegmentFile, enableParallelPushProtection, headers, zkDownloadURI, moveSegmentToFinalLocation, crypter, - allowRefresh, segmentSizeInBytes); - } - private void decryptFile(String crypterClassName, File tempEncryptedFile, File tempDecryptedFile) { PinotCrypter pinotCrypter = PinotCrypterFactory.create(crypterClassName); LOGGER.info("Using crypter class {} for decrypting {} to {}", pinotCrypter.getClass().getName(), tempEncryptedFile, @@ -435,7 +444,11 @@ public class PinotSegmentUploadDownloadRestletResource { @ApiOperation(value = "Upload a segment", notes = "Upload a segment as json") @ApiResponses(value = { @ApiResponse(code = 200, message = "Successfully uploaded segment"), - @ApiResponse(code = 410, message = "Segment to refresh is deleted"), + @ApiResponse(code = 400, message = "Bad Request"), + @ApiResponse(code = 403, message = "Segment validation fails"), + @ApiResponse(code = 409, message = "Segment already exists or another parallel push in progress"), + @ApiResponse(code = 410, message = "Segment to refresh does not exist"), + @ApiResponse(code = 412, message = "CRC check fails"), @ApiResponse(code = 500, message = "Internal error") }) // We use this endpoint with URI upload because a request sent with the multipart content type will reject the POST @@ -453,9 +466,8 @@ public class PinotSegmentUploadDownloadRestletResource { @QueryParam(FileUploadDownloadClient.QueryParameters.ALLOW_REFRESH) boolean allowRefresh, @Context HttpHeaders headers, @Context Request request, @Suspended final AsyncResponse asyncResponse) { try { - asyncResponse.resume( - uploadSegment(tableName, TableType.valueOf(tableType.toUpperCase()), null, enableParallelPushProtection, - headers, request, false, allowRefresh)); + asyncResponse.resume(uploadSegment(tableName, TableType.valueOf(tableType.toUpperCase()), null, false, + enableParallelPushProtection, allowRefresh, headers, request)); } catch (Throwable t) { asyncResponse.resume(t); } @@ -470,7 +482,11 @@ public class PinotSegmentUploadDownloadRestletResource { @ApiOperation(value = "Upload a segment", notes = "Upload a segment as binary") @ApiResponses(value = { @ApiResponse(code = 200, message = "Successfully uploaded segment"), - @ApiResponse(code = 410, message = "Segment to refresh is deleted"), + @ApiResponse(code = 400, message = "Bad Request"), + @ApiResponse(code = 403, message = "Segment validation fails"), + @ApiResponse(code = 409, message = "Segment already exists or another parallel push in progress"), + @ApiResponse(code = 410, message = "Segment to refresh does not exist"), + @ApiResponse(code = 412, message = "CRC check fails"), @ApiResponse(code = 500, message = "Internal error") }) // For the multipart endpoint, we will always move segment to final location regardless of the segment endpoint. @@ -486,9 +502,8 @@ public class PinotSegmentUploadDownloadRestletResource { @QueryParam(FileUploadDownloadClient.QueryParameters.ALLOW_REFRESH) boolean allowRefresh, @Context HttpHeaders headers, @Context Request request, @Suspended final AsyncResponse asyncResponse) { try { - asyncResponse.resume( - uploadSegment(tableName, TableType.valueOf(tableType.toUpperCase()), multiPart, enableParallelPushProtection, - headers, request, true, allowRefresh)); + asyncResponse.resume(uploadSegment(tableName, TableType.valueOf(tableType.toUpperCase()), multiPart, true, + enableParallelPushProtection, allowRefresh, headers, request)); } catch (Throwable t) { asyncResponse.resume(t); } @@ -503,7 +518,11 @@ public class PinotSegmentUploadDownloadRestletResource { @ApiOperation(value = "Upload a segment", notes = "Upload a segment as json") @ApiResponses(value = { @ApiResponse(code = 200, message = "Successfully uploaded segment"), - @ApiResponse(code = 410, message = "Segment to refresh is deleted"), + @ApiResponse(code = 400, message = "Bad Request"), + @ApiResponse(code = 403, message = "Segment validation fails"), + @ApiResponse(code = 409, message = "Segment already exists or another parallel push in progress"), + @ApiResponse(code = 410, message = "Segment to refresh does not exist"), + @ApiResponse(code = 412, message = "CRC check fails"), @ApiResponse(code = 500, message = "Internal error") }) // We use this endpoint with URI upload because a request sent with the multipart content type will reject the POST @@ -522,8 +541,8 @@ public class PinotSegmentUploadDownloadRestletResource { @Context HttpHeaders headers, @Context Request request, @Suspended final AsyncResponse asyncResponse) { try { asyncResponse.resume( - uploadSegment(tableName, TableType.valueOf(tableType.toUpperCase()), null, enableParallelPushProtection, - headers, request, true, allowRefresh)); + uploadSegment(tableName, TableType.valueOf(tableType.toUpperCase()), null, true, enableParallelPushProtection, + allowRefresh, headers, request)); } catch (Throwable t) { asyncResponse.resume(t); } @@ -538,7 +557,11 @@ public class PinotSegmentUploadDownloadRestletResource { @ApiOperation(value = "Upload a segment", notes = "Upload a segment as binary") @ApiResponses(value = { @ApiResponse(code = 200, message = "Successfully uploaded segment"), - @ApiResponse(code = 410, message = "Segment to refresh is deleted"), + @ApiResponse(code = 400, message = "Bad Request"), + @ApiResponse(code = 403, message = "Segment validation fails"), + @ApiResponse(code = 409, message = "Segment already exists or another parallel push in progress"), + @ApiResponse(code = 410, message = "Segment to refresh does not exist"), + @ApiResponse(code = 412, message = "CRC check fails"), @ApiResponse(code = 500, message = "Internal error") }) // This behavior does not differ from v1 of the same endpoint. @@ -554,9 +577,8 @@ public class PinotSegmentUploadDownloadRestletResource { @QueryParam(FileUploadDownloadClient.QueryParameters.ALLOW_REFRESH) boolean allowRefresh, @Context HttpHeaders headers, @Context Request request, @Suspended final AsyncResponse asyncResponse) { try { - asyncResponse.resume( - uploadSegment(tableName, TableType.valueOf(tableType.toUpperCase()), multiPart, enableParallelPushProtection, - headers, request, true, allowRefresh)); + asyncResponse.resume(uploadSegment(tableName, TableType.valueOf(tableType.toUpperCase()), multiPart, true, + enableParallelPushProtection, allowRefresh, headers, request)); } catch (Throwable t) { asyncResponse.resume(t); } @@ -581,9 +603,8 @@ public class PinotSegmentUploadDownloadRestletResource { } String tableNameWithType = ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, tableType, LOGGER).get(0); - String segmentLineageEntryId = _pinotHelixResourceManager - .startReplaceSegments(tableNameWithType, startReplaceSegmentsRequest.getSegmentsFrom(), - startReplaceSegmentsRequest.getSegmentsTo(), forceCleanup); + String segmentLineageEntryId = _pinotHelixResourceManager.startReplaceSegments(tableNameWithType, + startReplaceSegmentsRequest.getSegmentsFrom(), startReplaceSegmentsRequest.getSegmentsTo(), forceCleanup); return Response.ok(JsonUtils.newObjectNode().put("segmentLineageEntryId", segmentLineageEntryId)).build(); } catch (WebApplicationException wae) { throw wae; @@ -629,8 +650,8 @@ public class PinotSegmentUploadDownloadRestletResource { public Response revertReplaceSegments( @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName, @ApiParam(value = "OFFLINE|REALTIME", required = true) @QueryParam("type") String tableTypeStr, - @ApiParam(value = "Segment lineage entry id to revert", required = true) - @QueryParam("segmentLineageEntryId") String segmentLineageEntryId, + @ApiParam(value = "Segment lineage entry id to revert", required = true) @QueryParam("segmentLineageEntryId") + String segmentLineageEntryId, @ApiParam(value = "Force revert in case the user knows that the lineage entry is interrupted") @QueryParam("forceRevert") @DefaultValue("false") boolean forceRevert) { try { @@ -652,7 +673,7 @@ public class PinotSegmentUploadDownloadRestletResource { } } - private File createSegmentFileFromMultipart(FormDataMultiPart multiPart, File dstFile) + private static void createSegmentFileFromMultipart(FormDataMultiPart multiPart, File destFile) throws IOException { // Read segment file or segment metadata file and directly use that information to update zk Map<String, List<FormDataBodyPart>> segmentMetadataMap = multiPart.getFields(); @@ -662,12 +683,11 @@ public class PinotSegmentUploadDownloadRestletResource { } FormDataBodyPart segmentMetadataBodyPart = segmentMetadataMap.values().iterator().next().get(0); try (InputStream inputStream = segmentMetadataBodyPart.getValueAs(InputStream.class); - OutputStream outputStream = new FileOutputStream(dstFile)) { + OutputStream outputStream = new FileOutputStream(destFile)) { IOUtils.copyLarge(inputStream, outputStream); } finally { multiPart.cleanup(); } - return dstFile; } private FileUploadDownloadClient.FileUploadType getUploadType(String uploadTypeStr) { diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/SegmentValidationUtils.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/SegmentValidationUtils.java new file mode 100644 index 0000000000..22f264d256 --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/SegmentValidationUtils.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.api.upload; + +import java.io.File; +import java.util.concurrent.Executor; +import javax.ws.rs.core.Response; +import org.apache.commons.httpclient.HttpConnectionManager; +import org.apache.commons.io.FileUtils; +import org.apache.pinot.common.metrics.ControllerMetrics; +import org.apache.pinot.controller.ControllerConf; +import org.apache.pinot.controller.api.exception.ControllerApplicationException; +import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; +import org.apache.pinot.controller.util.TableSizeReader; +import org.apache.pinot.controller.validation.StorageQuotaChecker; +import org.apache.pinot.segment.spi.SegmentMetadata; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.utils.TimeUtils; +import org.joda.time.Interval; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * SegmentValidationUtils provides utility methods to validate the segment during segment upload. + */ +public class SegmentValidationUtils { + private SegmentValidationUtils() { + } + + private static final Logger LOGGER = LoggerFactory.getLogger(SegmentValidationUtils.class); + + public static void validateTimeInterval(SegmentMetadata segmentMetadata, TableConfig tableConfig) { + Interval timeInterval = segmentMetadata.getTimeInterval(); + if (timeInterval != null) { + if (!TimeUtils.isValidTimeInterval(timeInterval)) { + throw new ControllerApplicationException(LOGGER, String.format( + "Invalid segment start/end time: %s (in millis: %d/%d) for segment: %s of table: %s, must be between: %s", + timeInterval, timeInterval.getStartMillis(), timeInterval.getEndMillis(), segmentMetadata.getName(), + tableConfig.getTableName(), TimeUtils.VALID_TIME_INTERVAL), Response.Status.FORBIDDEN); + } + } else { + String timeColumn = tableConfig.getValidationConfig().getTimeColumnName(); + if (timeColumn != null) { + throw new ControllerApplicationException(LOGGER, + String.format("Failed to find time interval in segment: %s for table: %s with time column: %s", + segmentMetadata.getName(), tableConfig.getTableName(), timeColumn), Response.Status.FORBIDDEN); + } + } + } + + public static void checkStorageQuota(File segmentDir, SegmentMetadata segmentMetadata, TableConfig tableConfig, + PinotHelixResourceManager resourceManager, ControllerConf controllerConf, ControllerMetrics controllerMetrics, + HttpConnectionManager connectionManager, Executor executor, boolean isLeaderForTable) { + if (!controllerConf.getEnableStorageQuotaCheck()) { + return; + } + TableSizeReader tableSizeReader = + new TableSizeReader(executor, connectionManager, controllerMetrics, resourceManager); + StorageQuotaChecker quotaChecker = + new StorageQuotaChecker(tableConfig, tableSizeReader, controllerMetrics, isLeaderForTable, resourceManager); + StorageQuotaChecker.QuotaCheckerResponse response; + try { + response = + quotaChecker.isSegmentStorageWithinQuota(segmentMetadata.getName(), FileUtils.sizeOfDirectory(segmentDir), + controllerConf.getServerAdminRequestTimeoutSeconds() * 1000); + } catch (Exception e) { + throw new ControllerApplicationException(LOGGER, + String.format("Caught exception while checking the storage quota for segment: %s of table: %s", + segmentMetadata.getName(), tableConfig.getTableName()), Response.Status.INTERNAL_SERVER_ERROR); + } + if (!response._isSegmentWithinQuota) { + throw new ControllerApplicationException(LOGGER, + String.format("Storage quota check failed for segment: %s of table: %s, reason: %s", + segmentMetadata.getName(), tableConfig.getTableName(), response._reason), Response.Status.FORBIDDEN); + } + } +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/SegmentValidator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/SegmentValidator.java deleted file mode 100644 index 3e34afdc67..0000000000 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/SegmentValidator.java +++ /dev/null @@ -1,122 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.controller.api.upload; - -import java.io.File; -import java.util.concurrent.Executor; -import javax.ws.rs.core.Response; -import org.apache.commons.httpclient.HttpConnectionManager; -import org.apache.commons.io.FileUtils; -import org.apache.pinot.common.exception.InvalidConfigException; -import org.apache.pinot.common.metadata.ZKMetadataProvider; -import org.apache.pinot.common.metrics.ControllerMetrics; -import org.apache.pinot.controller.ControllerConf; -import org.apache.pinot.controller.api.exception.ControllerApplicationException; -import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; -import org.apache.pinot.controller.util.TableSizeReader; -import org.apache.pinot.controller.validation.StorageQuotaChecker; -import org.apache.pinot.segment.spi.SegmentMetadata; -import org.apache.pinot.spi.config.table.TableConfig; -import org.apache.pinot.spi.utils.TimeUtils; -import org.joda.time.Interval; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -/** - * SegmentValidator is a util class used during segment upload. It does verification such as a quota check and - * validating - * that the segment time values stored in the segment are valid. - */ -public class SegmentValidator { - private static final Logger LOGGER = LoggerFactory.getLogger(SegmentValidator.class); - private final PinotHelixResourceManager _pinotHelixResourceManager; - private final ControllerConf _controllerConf; - private final Executor _executor; - private final HttpConnectionManager _connectionManager; - private final ControllerMetrics _controllerMetrics; - private final boolean _isLeaderForTable; - - public SegmentValidator(PinotHelixResourceManager pinotHelixResourceManager, ControllerConf controllerConf, - Executor executor, HttpConnectionManager connectionManager, ControllerMetrics controllerMetrics, - boolean isLeaderForTable) { - _pinotHelixResourceManager = pinotHelixResourceManager; - _controllerConf = controllerConf; - _executor = executor; - _connectionManager = connectionManager; - _controllerMetrics = controllerMetrics; - _isLeaderForTable = isLeaderForTable; - } - - public void validateOfflineSegment(String offlineTableName, SegmentMetadata segmentMetadata, File tempSegmentDir) { - TableConfig offlineTableConfig = - ZKMetadataProvider.getOfflineTableConfig(_pinotHelixResourceManager.getPropertyStore(), offlineTableName); - if (offlineTableConfig == null) { - throw new ControllerApplicationException(LOGGER, "Failed to find table config for table: " + offlineTableName, - Response.Status.NOT_FOUND); - } - - String segmentName = segmentMetadata.getName(); - StorageQuotaChecker.QuotaCheckerResponse quotaResponse; - try { - quotaResponse = checkStorageQuota(tempSegmentDir, segmentMetadata, offlineTableConfig); - } catch (InvalidConfigException e) { - // Admin port is missing, return response with 500 status code. - throw new ControllerApplicationException(LOGGER, - "Quota check failed for segment: " + segmentName + " of table: " + offlineTableName + ", reason: " + e - .getMessage(), Response.Status.INTERNAL_SERVER_ERROR); - } - if (!quotaResponse._isSegmentWithinQuota) { - throw new ControllerApplicationException(LOGGER, - "Quota check failed for segment: " + segmentName + " of table: " + offlineTableName + ", reason: " - + quotaResponse._reason, Response.Status.FORBIDDEN); - } - - // Check time interval - // TODO: Pass in schema and check the existence of time interval when time field exists - Interval timeInterval = segmentMetadata.getTimeInterval(); - if (timeInterval != null && !TimeUtils.isValidTimeInterval(timeInterval)) { - throw new ControllerApplicationException(LOGGER, String.format( - "Invalid segment start/end time: %s (in millis: %d/%d) for segment: %s of table: %s, must be between: %s", - timeInterval, timeInterval.getStartMillis(), timeInterval.getEndMillis(), segmentName, offlineTableName, - TimeUtils.VALID_TIME_INTERVAL), Response.Status.NOT_ACCEPTABLE); - } - } - - /** - * check if the segment represented by segmentFile is within the storage quota - * @param segmentFile untarred segment. This should not be null. - * segmentFile must exist on disk and must be a directory - * @param metadata segment metadata. This should not be null. - * @param offlineTableConfig offline table configuration. This should not be null. - */ - private StorageQuotaChecker.QuotaCheckerResponse checkStorageQuota(File segmentFile, SegmentMetadata metadata, - TableConfig offlineTableConfig) - throws InvalidConfigException { - if (!_controllerConf.getEnableStorageQuotaCheck()) { - return StorageQuotaChecker.success("Quota check is disabled"); - } - TableSizeReader tableSizeReader = - new TableSizeReader(_executor, _connectionManager, _controllerMetrics, _pinotHelixResourceManager); - StorageQuotaChecker quotaChecker = new StorageQuotaChecker(offlineTableConfig, tableSizeReader, - _controllerMetrics, _isLeaderForTable, _pinotHelixResourceManager); - return quotaChecker.isSegmentStorageWithinQuota(metadata.getName(), FileUtils.sizeOfDirectory(segmentFile), - _controllerConf.getServerAdminRequestTimeoutSeconds() * 1000); - } -} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java index 2e7ba5c76e..7a12d9e5eb 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java @@ -20,6 +20,7 @@ package org.apache.pinot.controller.api.upload; import java.io.File; import java.net.URI; +import javax.annotation.Nullable; import javax.ws.rs.core.HttpHeaders; import javax.ws.rs.core.Response; import org.apache.helix.ZNRecord; @@ -33,7 +34,6 @@ import org.apache.pinot.controller.api.exception.ControllerApplicationException; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; import org.apache.pinot.controller.helix.core.util.ZKMetadataUtils; import org.apache.pinot.segment.spi.SegmentMetadata; -import org.apache.pinot.spi.filesystem.PinotFS; import org.apache.pinot.spi.filesystem.PinotFSFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,26 +60,24 @@ public class ZKOperator { } public void completeSegmentOperations(String tableNameWithType, SegmentMetadata segmentMetadata, - URI finalSegmentLocationURI, File currentSegmentLocation, boolean enableParallelPushProtection, - HttpHeaders headers, String zkDownloadURI, boolean moveSegmentToFinalLocation, String crypter, - boolean allowRefresh, long segmentSizeInBytes) + @Nullable URI finalSegmentLocationURI, File segmentFile, String downloadUrl, @Nullable String crypterName, + long segmentSizeInBytes, boolean enableParallelPushProtection, boolean allowRefresh, HttpHeaders headers) throws Exception { String segmentName = segmentMetadata.getName(); - ZNRecord segmentMetadataZNRecord = + ZNRecord existingSegmentMetadataZNRecord = _pinotHelixResourceManager.getSegmentMetadataZnRecord(tableNameWithType, segmentName); boolean refreshOnly = Boolean.parseBoolean(headers.getHeaderString(FileUploadDownloadClient.CustomHeaders.REFRESH_ONLY)); - if (segmentMetadataZNRecord == null) { + if (existingSegmentMetadataZNRecord == null) { // Add a new segment if (refreshOnly) { throw new ControllerApplicationException(LOGGER, - "Cannot refresh non-existing segment, aborted uploading segment: " + segmentName + " of table: " - + tableNameWithType, Response.Status.GONE); + String.format("Cannot refresh non-existing segment: %s for table: %s", segmentName, tableNameWithType), + Response.Status.GONE); } - LOGGER.info("Adding new segment {} from table {}", segmentName, tableNameWithType); - processNewSegment(segmentMetadata, finalSegmentLocationURI, currentSegmentLocation, zkDownloadURI, headers, - crypter, tableNameWithType, segmentName, moveSegmentToFinalLocation, enableParallelPushProtection, - segmentSizeInBytes); + LOGGER.info("Adding new segment: {} to table: {}", segmentName, tableNameWithType); + processNewSegment(tableNameWithType, segmentMetadata, finalSegmentLocationURI, segmentFile, downloadUrl, + crypterName, segmentSizeInBytes, enableParallelPushProtection, headers); } else { // Refresh an existing segment if (!allowRefresh) { @@ -87,26 +85,27 @@ public class ZKOperator { // done up-front but ends up getting created before the check here, we could incorrectly refresh an existing // segment. throw new ControllerApplicationException(LOGGER, - "Segment: " + segmentName + " already exists in table: " + tableNameWithType + ". Refresh not permitted.", - Response.Status.CONFLICT); + String.format("Segment: %s already exists in table: %s. Refresh not permitted.", segmentName, + tableNameWithType), Response.Status.CONFLICT); } - LOGGER.info("Segment {} from table {} already exists, refreshing if necessary", segmentName, tableNameWithType); - processExistingSegment(segmentMetadata, finalSegmentLocationURI, currentSegmentLocation, - enableParallelPushProtection, headers, zkDownloadURI, crypter, tableNameWithType, segmentName, - segmentMetadataZNRecord, moveSegmentToFinalLocation, segmentSizeInBytes); + LOGGER.info("Segment: {} already exists in table: {}, refreshing it", segmentName, tableNameWithType); + processExistingSegment(tableNameWithType, segmentMetadata, existingSegmentMetadataZNRecord, + finalSegmentLocationURI, segmentFile, downloadUrl, crypterName, segmentSizeInBytes, + enableParallelPushProtection, headers); } } - private void processExistingSegment(SegmentMetadata segmentMetadata, URI finalSegmentLocationURI, - File currentSegmentLocation, boolean enableParallelPushProtection, HttpHeaders headers, String downloadUrl, - String crypter, String tableNameWithType, String segmentName, ZNRecord znRecord, - boolean moveSegmentToFinalLocation, long segmentSizeInBytes) + private void processExistingSegment(String tableNameWithType, SegmentMetadata segmentMetadata, + ZNRecord existingSegmentMetadataZNRecord, @Nullable URI finalSegmentLocationURI, File segmentFile, + String downloadUrl, @Nullable String crypterName, long segmentSizeInBytes, boolean enableParallelPushProtection, + HttpHeaders headers) throws Exception { - SegmentZKMetadata segmentZKMetadata = new SegmentZKMetadata(znRecord); - long existingCrc = segmentZKMetadata.getCrc(); - int expectedVersion = znRecord.getVersion(); + String segmentName = segmentMetadata.getName(); + int expectedVersion = existingSegmentMetadataZNRecord.getVersion(); // Check if CRC match when IF-MATCH header is set + SegmentZKMetadata segmentZKMetadata = new SegmentZKMetadata(existingSegmentMetadataZNRecord); + long existingCrc = segmentZKMetadata.getCrc(); checkCRC(headers, tableNameWithType, segmentName, existingCrc); // Check segment upload start time when parallel push protection enabled @@ -122,8 +121,8 @@ public class ZKOperator { } else { // Another segment upload is in progress throw new ControllerApplicationException(LOGGER, - "Another segment upload is in progress for segment: " + segmentName + " of table: " + tableNameWithType - + ", retry later", Response.Status.CONFLICT); + String.format("Another segment upload is in progress for segment: %s of table: %s, retry later", + segmentName, tableNameWithType), Response.Status.CONFLICT); } } @@ -131,7 +130,7 @@ public class ZKOperator { segmentZKMetadata.setSegmentUploadStartTime(System.currentTimeMillis()); if (!_pinotHelixResourceManager.updateZkMetadata(tableNameWithType, segmentZKMetadata, expectedVersion)) { throw new ControllerApplicationException(LOGGER, - "Failed to lock the segment: " + segmentName + " of table: " + tableNameWithType + ", retry later", + String.format("Failed to lock the segment: %s of table: %s, retry later", segmentName, tableNameWithType), Response.Status.CONFLICT); } else { // The version will increment if the zk metadata update is successful @@ -179,13 +178,10 @@ public class ZKOperator { LOGGER.info( "New segment crc {} is different than the existing segment crc {}. Updating ZK metadata and refreshing " + "segment {}", newCrc, existingCrc, segmentName); - if (moveSegmentToFinalLocation) { - moveSegmentToPermanentDirectory(currentSegmentLocation, finalSegmentLocationURI); - LOGGER.info("Moved segment {} from temp location {} to {}", segmentName, - currentSegmentLocation.getAbsolutePath(), finalSegmentLocationURI.getPath()); - } else { - LOGGER.info("Skipping segment move, keeping segment {} from table {} at {}", segmentName, tableNameWithType, - downloadUrl); + if (finalSegmentLocationURI != null) { + moveSegmentToPermanentDirectory(segmentFile, finalSegmentLocationURI); + LOGGER.info("Moved segment: {} of table: {} to final location: {}", segmentName, tableNameWithType, + finalSegmentLocationURI); } // NOTE: Must first set the segment ZK metadata before trying to refresh because servers and brokers rely on @@ -196,11 +192,11 @@ public class ZKOperator { // If no modifier is provided, use the custom map from the segment metadata segmentZKMetadata.setCustomMap(null); ZKMetadataUtils.refreshSegmentZKMetadata(tableNameWithType, segmentZKMetadata, segmentMetadata, downloadUrl, - crypter, segmentSizeInBytes); + crypterName, segmentSizeInBytes); } else { // If modifier is provided, first set the custom map from the segment metadata, then apply the modifier ZKMetadataUtils.refreshSegmentZKMetadata(tableNameWithType, segmentZKMetadata, segmentMetadata, downloadUrl, - crypter, segmentSizeInBytes); + crypterName, segmentSizeInBytes); segmentZKMetadata.setCustomMap(customMapModifier.modifyMap(segmentZKMetadata.getCustomMap())); } if (!_pinotHelixResourceManager.updateZkMetadata(tableNameWithType, segmentZKMetadata, expectedVersion)) { @@ -222,7 +218,7 @@ public class ZKOperator { } } - private void checkCRC(HttpHeaders headers, String offlineTableName, String segmentName, long existingCrc) { + private void checkCRC(HttpHeaders headers, String tableNameWithType, String segmentName, long existingCrc) { String expectedCrcStr = headers.getHeaderString(HttpHeaders.IF_MATCH); if (expectedCrcStr != null) { long expectedCrc; @@ -230,24 +226,24 @@ public class ZKOperator { expectedCrc = Long.parseLong(expectedCrcStr); } catch (NumberFormatException e) { throw new ControllerApplicationException(LOGGER, - "Caught exception for segment: " + segmentName + " of table: " + offlineTableName - + " while parsing IF-MATCH CRC: \"" + expectedCrcStr + "\"", Response.Status.PRECONDITION_FAILED); + String.format("Caught exception for segment: %s of table: %s while parsing IF-MATCH CRC: \"%s\"", + segmentName, tableNameWithType, expectedCrcStr), Response.Status.PRECONDITION_FAILED); } if (expectedCrc != existingCrc) { throw new ControllerApplicationException(LOGGER, - "For segment: " + segmentName + " of table: " + offlineTableName + ", expected CRC: " + expectedCrc - + " does not match existing CRC: " + existingCrc, Response.Status.PRECONDITION_FAILED); + String.format("For segment: %s of table: %s, expected CRC: %d does not match existing CRC: %d", segmentName, + tableNameWithType, expectedCrc, existingCrc), Response.Status.PRECONDITION_FAILED); } } } - private void processNewSegment(SegmentMetadata segmentMetadata, URI finalSegmentLocationURI, - File currentSegmentLocation, String zkDownloadURI, HttpHeaders headers, String crypter, String tableNameWithType, - String segmentName, boolean moveSegmentToFinalLocation, boolean enableParallelPushProtection, - long segmentSizeInBytes) + private void processNewSegment(String tableNameWithType, SegmentMetadata segmentMetadata, + @Nullable URI finalSegmentLocationURI, File segmentFile, String downloadUrl, @Nullable String crypterName, + long segmentSizeInBytes, boolean enableParallelPushProtection, HttpHeaders headers) throws Exception { + String segmentName = segmentMetadata.getName(); SegmentZKMetadata newSegmentZKMetadata = - ZKMetadataUtils.createSegmentZKMetadata(tableNameWithType, segmentMetadata, zkDownloadURI, crypter, + ZKMetadataUtils.createSegmentZKMetadata(tableNameWithType, segmentMetadata, downloadUrl, crypterName, segmentSizeInBytes); // Lock if enableParallelPushProtection is true. @@ -266,15 +262,14 @@ public class ZKOperator { } if (!_pinotHelixResourceManager.createSegmentZkMetadata(tableNameWithType, newSegmentZKMetadata)) { throw new RuntimeException( - "Failed to create ZK metadata for segment: " + segmentName + " of table: " + tableNameWithType); + String.format("Failed to create ZK metadata for segment: %s of table: %s", segmentName, tableNameWithType)); } - // For v1 segment uploads, we will not move the segment - if (moveSegmentToFinalLocation) { + if (finalSegmentLocationURI != null) { try { - moveSegmentToPermanentDirectory(currentSegmentLocation, finalSegmentLocationURI); - LOGGER.info("Moved segment {} from temp location {} to {}", segmentName, - currentSegmentLocation.getAbsolutePath(), finalSegmentLocationURI.getPath()); + moveSegmentToPermanentDirectory(segmentFile, finalSegmentLocationURI); + LOGGER.info("Moved segment: {} of table: {} to final location: {}", segmentName, tableNameWithType, + finalSegmentLocationURI); } catch (Exception e) { // Cleanup the Zk entry and the segment from the permanent directory if it exists. LOGGER.error("Could not move segment {} from table {} to permanent directory", segmentName, tableNameWithType, @@ -283,9 +278,6 @@ public class ZKOperator { LOGGER.info("Deleted zk entry and segment {} for table {}.", segmentName, tableNameWithType); throw e; } - } else { - LOGGER.info("Skipping segment move, keeping segment {} from table {} at {}", segmentName, tableNameWithType, - zkDownloadURI); } try { @@ -306,18 +298,14 @@ public class ZKOperator { _pinotHelixResourceManager.deleteSegment(tableNameWithType, segmentName); LOGGER.info("Deleted zk entry and segment {} for table {}.", segmentName, tableNameWithType); throw new RuntimeException( - "Failed to update ZK metadata for segment: " + segmentName + " of table: " + tableNameWithType); + String.format("Failed to update ZK metadata for segment: %s of table: %s", segmentFile, tableNameWithType)); } } } - private void moveSegmentToPermanentDirectory(File currentSegmentLocation, URI finalSegmentLocationURI) + private void moveSegmentToPermanentDirectory(File segmentFile, URI finalSegmentLocationURI) throws Exception { - PinotFS pinotFS = PinotFSFactory.create(finalSegmentLocationURI.getScheme()); - - // Overwrites current segment file - LOGGER.info("Copying segment from {} to {}", currentSegmentLocation.getAbsolutePath(), - finalSegmentLocationURI.toString()); - pinotFS.copyFromLocalFile(currentSegmentLocation, finalSegmentLocationURI); + LOGGER.info("Copying segment from: {} to: {}", segmentFile.getAbsolutePath(), finalSegmentLocationURI); + PinotFSFactory.create(finalSegmentLocationURI.getScheme()).copyFromLocalFile(segmentFile, finalSegmentLocationURI); } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index 4c9f61fc4a..f9d0e86447 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -689,18 +689,6 @@ public class PinotHelixResourceManager { } } - /** - * Returns the crypter class name defined in the table config for the given table. - * - * @param tableNameWithType Table name with type suffix - * @return crypter class name - */ - public String getCrypterClassNameFromTableConfig(String tableNameWithType) { - TableConfig tableConfig = _tableCache.getTableConfig(tableNameWithType); - Preconditions.checkNotNull(tableConfig, "Table config is not available for table '%s'", tableNameWithType); - return tableConfig.getValidationConfig().getCrypterClassName(); - } - /** * Table related APIs */ diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java index cae0b48949..2d07e20be0 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java @@ -459,10 +459,6 @@ public class PinotLLCRealtimeSegmentManager { public void commitSegmentFile(String realtimeTableName, CommittingSegmentDescriptor committingSegmentDescriptor) throws Exception { Preconditions.checkState(!_isStopping, "Segment manager is stopping"); - if (isPeerSegmentDownloadScheme(committingSegmentDescriptor)) { - LOGGER.info("No moving needed for segment on peer servers: {}", committingSegmentDescriptor.getSegmentLocation()); - return; - } String rawTableName = TableNameBuilder.extractRawTableName(realtimeTableName); String segmentName = committingSegmentDescriptor.getSegmentName(); @@ -470,6 +466,12 @@ public class PinotLLCRealtimeSegmentManager { // Copy the segment file to the controller String segmentLocation = committingSegmentDescriptor.getSegmentLocation(); + Preconditions.checkArgument(segmentLocation != null, "Segment location must be provided"); + if (segmentLocation.regionMatches(true, 0, CommonConstants.Segment.PEER_SEGMENT_DOWNLOAD_SCHEME, 0, + CommonConstants.Segment.PEER_SEGMENT_DOWNLOAD_SCHEME.length())) { + LOGGER.info("No moving needed for segment on peer servers: {}", segmentLocation); + return; + } URI segmentFileURI = URIUtils.getUri(segmentLocation); URI tableDirURI = URIUtils.getUri(_controllerConf.getDataDir(), rawTableName); URI uriToMoveTo = URIUtils.getUri(_controllerConf.getDataDir(), rawTableName, URIUtils.encode(segmentName)); @@ -494,12 +496,6 @@ public class PinotLLCRealtimeSegmentManager { committingSegmentDescriptor.setSegmentLocation(uriToMoveTo.toString()); } - private boolean isPeerSegmentDownloadScheme(CommittingSegmentDescriptor committingSegmentDescriptor) { - return !(committingSegmentDescriptor == null) && !(committingSegmentDescriptor.getSegmentLocation() == null) - && committingSegmentDescriptor.getSegmentLocation().toLowerCase() - .startsWith(CommonConstants.Segment.PEER_SEGMENT_DOWNLOAD_SCHEME); - } - /** * This method is invoked after the realtime segment is uploaded but before a response is sent to the server. * It updates the propertystore segment metadata from IN_PROGRESS to DONE, and also creates new propertystore diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ZKMetadataUtils.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ZKMetadataUtils.java index 2628d9f6bd..11e67b1663 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ZKMetadataUtils.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ZKMetadataUtils.java @@ -38,9 +38,9 @@ public class ZKMetadataUtils { * Creates the segment ZK metadata for a new segment. */ public static SegmentZKMetadata createSegmentZKMetadata(String tableNameWithType, SegmentMetadata segmentMetadata, - String downloadUrl, @Nullable String crypter, long segmentSizeInBytes) { + String downloadUrl, @Nullable String crypterName, long segmentSizeInBytes) { SegmentZKMetadata segmentZKMetadata = new SegmentZKMetadata(segmentMetadata.getName()); - updateSegmentZKMetadata(tableNameWithType, segmentZKMetadata, segmentMetadata, downloadUrl, crypter, + updateSegmentZKMetadata(tableNameWithType, segmentZKMetadata, segmentMetadata, downloadUrl, crypterName, segmentSizeInBytes); segmentZKMetadata.setPushTime(System.currentTimeMillis()); return segmentZKMetadata; @@ -50,14 +50,14 @@ public class ZKMetadataUtils { * Refreshes the segment ZK metadata for a segment being replaced. */ public static void refreshSegmentZKMetadata(String tableNameWithType, SegmentZKMetadata segmentZKMetadata, - SegmentMetadata segmentMetadata, String downloadUrl, @Nullable String crypter, long segmentSizeInBytes) { - updateSegmentZKMetadata(tableNameWithType, segmentZKMetadata, segmentMetadata, downloadUrl, crypter, + SegmentMetadata segmentMetadata, String downloadUrl, @Nullable String crypterName, long segmentSizeInBytes) { + updateSegmentZKMetadata(tableNameWithType, segmentZKMetadata, segmentMetadata, downloadUrl, crypterName, segmentSizeInBytes); segmentZKMetadata.setRefreshTime(System.currentTimeMillis()); } private static void updateSegmentZKMetadata(String tableNameWithType, SegmentZKMetadata segmentZKMetadata, - SegmentMetadata segmentMetadata, String downloadUrl, @Nullable String crypter, long segmentSizeInBytes) { + SegmentMetadata segmentMetadata, String downloadUrl, @Nullable String crypterName, long segmentSizeInBytes) { if (segmentMetadata.getTimeInterval() != null) { segmentZKMetadata.setStartTime(segmentMetadata.getStartTime()); segmentZKMetadata.setEndTime(segmentMetadata.getEndTime()); @@ -77,7 +77,7 @@ public class ZKMetadataUtils { segmentZKMetadata.setCrc(Long.parseLong(segmentMetadata.getCrc())); segmentZKMetadata.setCreationTime(segmentMetadata.getIndexCreationTime()); segmentZKMetadata.setDownloadUrl(downloadUrl); - segmentZKMetadata.setCrypterName(crypter); + segmentZKMetadata.setCrypterName(crypterName); // Set partition metadata Map<String, ColumnPartitionMetadata> columnPartitionMap = new HashMap<>(); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/upload/ZKOperatorTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/upload/ZKOperatorTest.java index c9a119b6a5..d4951587ce 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/api/upload/ZKOperatorTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/upload/ZKOperatorTest.java @@ -43,6 +43,7 @@ import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; + public class ZKOperatorTest { private static final String TABLE_NAME = "operatorTestTable"; private static final String OFFLINE_TABLE_NAME = TableNameBuilder.OFFLINE.tableNameWithType(TABLE_NAME); @@ -70,15 +71,13 @@ public class ZKOperatorTest { // Test if Zk segment metadata is removed if exception is thrown when moving segment to final location. try { - // Create mock finalSegmentLocationURI and currentSegmentLocation. + // Create mock finalSegmentLocationURI and segmentFile. URI finalSegmentLocationURI = URIUtils.getUri("mockPath", OFFLINE_TABLE_NAME, URIUtils.encode(segmentMetadata.getName())); - File currentSegmentLocation = new File(new File("foo/bar"), "mockChild"); + File segmentFile = new File(new File("foo/bar"), "mockChild"); - zkOperator - .completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, finalSegmentLocationURI, - currentSegmentLocation, true, httpHeaders, "downloadUrl", - true, "crypter", true, 10); + zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, finalSegmentLocationURI, segmentFile, + "downloadUrl", "crypter", 10, true, true, httpHeaders); fail(); } catch (Exception e) { // Expected @@ -91,9 +90,8 @@ public class ZKOperatorTest { return segmentZKMetadata == null; }, 30_000L, "Failed to delete segmentZkMetadata."); - zkOperator - .completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, null, null, true, httpHeaders, "downloadUrl", - false, "crypter", true, 10); + zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, null, null, "downloadUrl", "crypter", 10, + true, true, httpHeaders); SegmentZKMetadata segmentZKMetadata = ControllerTestUtils.getHelixResourceManager().getSegmentZKMetadata(OFFLINE_TABLE_NAME, SEGMENT_NAME); @@ -110,8 +108,8 @@ public class ZKOperatorTest { // Upload the same segment with allowRefresh = false. Validate that an exception is thrown. try { - zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, null, null, false, httpHeaders, - "otherDownloadUrl", false, "otherCrypter", false, 10); + zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, null, null, "otherDownloadUrl", + "otherCrypter", 10, true, false, httpHeaders); fail(); } catch (Exception e) { // Expected @@ -120,8 +118,8 @@ public class ZKOperatorTest { // Refresh the segment with unmatched IF_MATCH field when(httpHeaders.getHeaderString(HttpHeaders.IF_MATCH)).thenReturn("123"); try { - zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, null, null, true, httpHeaders, - "otherDownloadUrl", false, null, true, 10); + zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, null, null, "otherDownloadUrl", + "otherCrypter", 10, true, true, httpHeaders); fail(); } catch (Exception e) { // Expected @@ -131,10 +129,11 @@ public class ZKOperatorTest { // downloadURL and crypter when(httpHeaders.getHeaderString(HttpHeaders.IF_MATCH)).thenReturn("12345"); when(segmentMetadata.getIndexCreationTime()).thenReturn(456L); - zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, null, null, true, httpHeaders, - "otherDownloadUrl", false, "otherCrypter", true, 10); + zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, null, null, "otherDownloadUrl", + "otherCrypter", 10, true, true, httpHeaders); segmentZKMetadata = ControllerTestUtils.getHelixResourceManager().getSegmentZKMetadata(OFFLINE_TABLE_NAME, SEGMENT_NAME); + assertNotNull(segmentZKMetadata); assertEquals(segmentZKMetadata.getCrc(), 12345L); // Push time should not change assertEquals(segmentZKMetadata.getPushTime(), pushTime); @@ -152,13 +151,12 @@ public class ZKOperatorTest { when(segmentMetadata.getCrc()).thenReturn("23456"); when(segmentMetadata.getIndexCreationTime()).thenReturn(789L); // Add a tiny sleep to guarantee that refresh time is different from the previous round - // 1 second delay to avoid "org.apache.helix.HelixException: Specified EXTERNALVIEW operatorTestTable_OFFLINE is - // not found!" exception from being thrown sporadically. - Thread.sleep(1000L); - zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, null, null, true, httpHeaders, - "otherDownloadUrl", false, "otherCrypter", true, 10); + Thread.sleep(10); + zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, null, null, "otherDownloadUrl", + "otherCrypter", 100, true, true, httpHeaders); segmentZKMetadata = ControllerTestUtils.getHelixResourceManager().getSegmentZKMetadata(OFFLINE_TABLE_NAME, SEGMENT_NAME); + assertNotNull(segmentZKMetadata); assertEquals(segmentZKMetadata.getCrc(), 23456L); // Push time should not change assertEquals(segmentZKMetadata.getPushTime(), pushTime); @@ -167,7 +165,7 @@ public class ZKOperatorTest { assertTrue(segmentZKMetadata.getRefreshTime() > refreshTime); assertEquals(segmentZKMetadata.getDownloadUrl(), "otherDownloadUrl"); assertEquals(segmentZKMetadata.getCrypterName(), "otherCrypter"); - assertEquals(segmentZKMetadata.getSizeInBytes(), 10); + assertEquals(segmentZKMetadata.getSizeInBytes(), 100); } @AfterClass diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java index f52cc2d924..2be71ffd25 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java @@ -180,7 +180,7 @@ public class SegmentCompletionTest { _segmentCompletionMgr._seconds += 5; params = new Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(_s2Offset.toString()) - .withSegmentName(_segmentNameStr); + .withSegmentName(_segmentNameStr).withSegmentLocation("location"); response = _segmentCompletionMgr .segmentCommitEnd(params, true, false, CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params)); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS); @@ -257,7 +257,7 @@ public class SegmentCompletionTest { _segmentCompletionMgr._seconds += 5; params = new Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(_s2Offset.toString()) - .withSegmentName(_segmentNameStr); + .withSegmentName(_segmentNameStr).withSegmentLocation("location"); response = _segmentCompletionMgr .segmentCommitEnd(params, true, false, CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params)); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS); @@ -614,7 +614,7 @@ public class SegmentCompletionTest { _segmentCompletionMgr._seconds += 5; params = new Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(_s2Offset.toString()) - .withSegmentName(_segmentNameStr); + .withSegmentName(_segmentNameStr).withSegmentLocation("location"); response = _segmentCompletionMgr .segmentCommitEnd(params, true, false, CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params)); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS); @@ -690,7 +690,7 @@ public class SegmentCompletionTest { _segmentCompletionMgr._seconds += 5; params = new Request.Params().withInstanceId(S_1).withStreamPartitionMsgOffset(_s1Offset.toString()) - .withSegmentName(_segmentNameStr); + .withSegmentName(_segmentNameStr).withSegmentLocation("location"); response = _segmentCompletionMgr .segmentCommitEnd(params, true, false, CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params)); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS); @@ -977,7 +977,7 @@ public class SegmentCompletionTest { // Commit in 15s _segmentCompletionMgr._seconds += 15; params = new Request.Params().withInstanceId(S_2).withStreamPartitionMsgOffset(_s2Offset.toString()) - .withSegmentName(_segmentNameStr); + .withSegmentName(_segmentNameStr).withSegmentLocation("location"); response = _segmentCompletionMgr.segmentCommitStart(params); Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_CONTINUE); long commitTimeMs = (_segmentCompletionMgr._seconds - startTime) * 1000; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org