This is an automated email from the ASF dual-hosted git repository. jlli pushed a commit to branch add-revertReplaceSegments-in-FileUploadDownloadClient in repository https://gitbox.apache.org/repos/asf/pinot.git
commit 4475d9f9d27cd0b693de600fa9552194d0f693f0 Author: Jack Li(Analytics Engineering) <j...@jlli-mn1.linkedin.biz> AuthorDate: Thu Nov 18 14:45:37 2021 -0800 Add getRevertReplaceSegmentRequest method in FileUploadDownloadClient --- .../common/utils/FileUploadDownloadClient.java | 37 ++++++++++++++++++++-- .../PinotSegmentUploadDownloadRestletResource.java | 3 +- .../minion/tasks/SegmentConversionUtils.java | 12 ++++--- 3 files changed, 44 insertions(+), 8 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java index e209b9c..5e506d4 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java @@ -123,7 +123,10 @@ public class FileUploadDownloadClient implements Closeable { private static final String TYPE_DELIMITER = "type="; private static final String START_REPLACE_SEGMENTS_PATH = "/startReplaceSegments"; private static final String END_REPLACE_SEGMENTS_PATH = "/endReplaceSegments"; + private static final String REVERT_REPLACE_SEGMENTS_PATH = "/revertReplaceSegments"; private static final String SEGMENT_LINEAGE_ENTRY_ID_PARAMETER = "&segmentLineageEntryId="; + private static final String FORCE_REVERT_PARAMETER = "&forceRevert="; + private static final String FORCE_CLEANUP_PARAMETER = "&forceCleanup="; private static final String JSON_CONTENT_TYPE = "application/json"; private static final List<String> SUPPORTED_PROTOCOLS = Arrays.asList(HTTP, HTTPS); @@ -317,10 +320,12 @@ public class FileUploadDownloadClient implements Closeable { return getURI(controllerURI.getScheme(), controllerURI.getHost(), controllerURI.getPort(), SEGMENT_PATH); } - public static URI getStartReplaceSegmentsURI(URI controllerURI, String rawTableName, String tableType) + public static URI getStartReplaceSegmentsURI(URI controllerURI, String rawTableName, String tableType, + boolean forceCleanup) throws URISyntaxException { return getURI(controllerURI.getScheme(), controllerURI.getHost(), controllerURI.getPort(), - OLD_SEGMENT_PATH + "/" + rawTableName + START_REPLACE_SEGMENTS_PATH, TYPE_DELIMITER + tableType); + OLD_SEGMENT_PATH + "/" + rawTableName + START_REPLACE_SEGMENTS_PATH, + TYPE_DELIMITER + tableType + FORCE_CLEANUP_PARAMETER + forceCleanup); } public static URI getEndReplaceSegmentsURI(URI controllerURI, String rawTableName, String tableType, @@ -331,6 +336,15 @@ public class FileUploadDownloadClient implements Closeable { TYPE_DELIMITER + tableType + SEGMENT_LINEAGE_ENTRY_ID_PARAMETER + segmentLineageEntryId); } + public static URI getRevertReplaceSegmentsURI(URI controllerURI, String rawTableName, String tableType, + String segmentLineageEntryId, boolean forceRevert) + throws URISyntaxException { + return getURI(controllerURI.getScheme(), controllerURI.getHost(), controllerURI.getPort(), + OLD_SEGMENT_PATH + "/" + rawTableName + REVERT_REPLACE_SEGMENTS_PATH, + TYPE_DELIMITER + tableType + SEGMENT_LINEAGE_ENTRY_ID_PARAMETER + segmentLineageEntryId + FORCE_REVERT_PARAMETER + + forceRevert); + } + private static HttpUriRequest getUploadFileRequest(String method, URI uri, ContentBody contentBody, @Nullable List<Header> headers, @Nullable List<NameValuePair> parameters, int socketTimeoutMs) { // Build the Http entity @@ -435,6 +449,12 @@ public class FileUploadDownloadClient implements Closeable { return requestBuilder.build(); } + private static HttpUriRequest getRevertReplaceSegmentRequest(URI uri) { + RequestBuilder requestBuilder = RequestBuilder.post(uri).setVersion(HttpVersion.HTTP_1_1) + .setHeader(HttpHeaders.CONTENT_TYPE, JSON_CONTENT_TYPE); + return requestBuilder.build(); + } + private static HttpUriRequest getSegmentCompletionProtocolRequest(URI uri, @Nullable List<Header> headers, @Nullable List<NameValuePair> parameters, int socketTimeoutMs) { RequestBuilder requestBuilder = RequestBuilder.get(uri).setVersion(HttpVersion.HTTP_1_1); @@ -965,6 +985,19 @@ public class FileUploadDownloadClient implements Closeable { } /** + * Revert replace segments with default settings. + * + * @param uri URI + * @return Response + * @throws IOException + * @throws HttpErrorStatusException + */ + public SimpleHttpResponse revertReplaceSegments(URI uri) + throws IOException, HttpErrorStatusException { + return sendRequest(getRevertReplaceSegmentRequest(uri)); + } + + /** * Deprecated due to lack of auth header support. May break for deployments with auth enabled * * Send segment completion protocol request. diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java index 1a75bfb..0c3b01d 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java @@ -546,7 +546,8 @@ public class PinotSegmentUploadDownloadRestletResource { @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName, @ApiParam(value = "OFFLINE|REALTIME", required = true) @QueryParam("type") String tableTypeStr, @ApiParam(value = "Force cleanup") @QueryParam("forceCleanup") @DefaultValue("false") boolean forceCleanup, - StartReplaceSegmentsRequest startReplaceSegmentsRequest) { + @ApiParam(value = "Fields belonging to start replace segment request", required = true) + StartReplaceSegmentsRequest startReplaceSegmentsRequest) { try { TableType tableType = Constants.validateTableType(tableTypeStr); if (tableType == null) { diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/SegmentConversionUtils.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/SegmentConversionUtils.java index 6c60aed..697dedb 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/SegmentConversionUtils.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/SegmentConversionUtils.java @@ -128,11 +128,13 @@ public class SegmentConversionUtils { TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType); SSLContext sslContext = MinionContext.getInstance().getSSLContext(); try (FileUploadDownloadClient fileUploadDownloadClient = new FileUploadDownloadClient(sslContext)) { - URI uri = FileUploadDownloadClient.getStartReplaceSegmentsURI(new URI(uploadURL), rawTableName, tableType.name()); + URI uri = + FileUploadDownloadClient.getStartReplaceSegmentsURI(new URI(uploadURL), rawTableName, tableType.name(), true); SimpleHttpResponse response = fileUploadDownloadClient.startReplaceSegments(uri, startReplaceSegmentsRequest); String responseString = response.getResponse(); - LOGGER.info("Got response {}: {} while uploading table: {}, uploadURL: {}, request: {}", response.getStatusCode(), - responseString, tableNameWithType, uploadURL, startReplaceSegmentsRequest); + LOGGER.info( + "Got response {}: {} while sending start replace segment request for table: {}, uploadURL: {}, request: {}", + response.getStatusCode(), responseString, tableNameWithType, uploadURL, startReplaceSegmentsRequest); return JsonUtils.stringToJsonNode(responseString).get("segmentLineageEntryId").asText(); } } @@ -147,8 +149,8 @@ public class SegmentConversionUtils { URI uri = FileUploadDownloadClient .getEndReplaceSegmentsURI(new URI(uploadURL), rawTableName, tableType.name(), segmentLineageEntryId); SimpleHttpResponse response = fileUploadDownloadClient.endReplaceSegments(uri, socketTimeoutMs); - LOGGER.info("Got response {}: {} while uploading table: {}, uploadURL: {}", response.getStatusCode(), - response.getResponse(), tableNameWithType, uploadURL); + LOGGER.info("Got response {}: {} while sending end replace segment request for table: {}, uploadURL: {}", + response.getStatusCode(), response.getResponse(), tableNameWithType, uploadURL); } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org