This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 08f0f32 add auth token for segment replace rest APIs (#8146) 08f0f32 is described below commit 08f0f32501602c3c74f8f70a124f0190423e43c0 Author: Xiaobing <61892277+klsi...@users.noreply.github.com> AuthorDate: Mon Feb 7 15:04:28 2022 -0800 add auth token for segment replace rest APIs (#8146) Add auth token param for segment replace rest APIs, as done for the other rest APIs in FileUploadDownloadClient --- .../common/utils/FileUploadDownloadClient.java | 22 ++++++++++++++++------ .../BaseMultipleSegmentsConversionExecutor.java | 7 +++---- .../minion/tasks/SegmentConversionUtils.java | 10 ++++++---- 3 files changed, 25 insertions(+), 14 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java index 0cb747c..fff6637 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java @@ -436,17 +436,24 @@ public class FileUploadDownloadClient implements Closeable { return requestBuilder.build(); } - private static HttpUriRequest getStartReplaceSegmentsRequest(URI uri, String jsonRequestBody, int socketTimeoutMs) { + private static HttpUriRequest getStartReplaceSegmentsRequest(URI uri, String jsonRequestBody, int socketTimeoutMs, + @Nullable String authToken) { RequestBuilder requestBuilder = RequestBuilder.post(uri).setVersion(HttpVersion.HTTP_1_1).setHeader(HttpHeaders.CONTENT_TYPE, JSON_CONTENT_TYPE) .setEntity(new StringEntity(jsonRequestBody, ContentType.APPLICATION_JSON)); + if (StringUtils.isNotBlank(authToken)) { + requestBuilder.addHeader("Authorization", authToken); + } setTimeout(requestBuilder, socketTimeoutMs); return requestBuilder.build(); } - private static HttpUriRequest getEndReplaceSegmentsRequest(URI uri, int socketTimeoutMs) { + private static HttpUriRequest getEndReplaceSegmentsRequest(URI uri, int socketTimeoutMs, @Nullable String authToken) { RequestBuilder requestBuilder = RequestBuilder.post(uri).setVersion(HttpVersion.HTTP_1_1) .setHeader(HttpHeaders.CONTENT_TYPE, JSON_CONTENT_TYPE); + if (StringUtils.isNotBlank(authToken)) { + requestBuilder.addHeader("Authorization", authToken); + } setTimeout(requestBuilder, socketTimeoutMs); return requestBuilder.build(); } @@ -1023,14 +1030,16 @@ public class FileUploadDownloadClient implements Closeable { * * @param uri URI * @param startReplaceSegmentsRequest request + * @param authToken auth token * @return Response * @throws IOException * @throws HttpErrorStatusException */ - public SimpleHttpResponse startReplaceSegments(URI uri, StartReplaceSegmentsRequest startReplaceSegmentsRequest) + public SimpleHttpResponse startReplaceSegments(URI uri, StartReplaceSegmentsRequest startReplaceSegmentsRequest, + @Nullable String authToken) throws IOException, HttpErrorStatusException { return sendRequest(getStartReplaceSegmentsRequest(uri, JsonUtils.objectToString(startReplaceSegmentsRequest), - DEFAULT_SOCKET_TIMEOUT_MS)); + DEFAULT_SOCKET_TIMEOUT_MS, authToken)); } /** @@ -1038,13 +1047,14 @@ public class FileUploadDownloadClient implements Closeable { * * @param uri URI * @oaram socketTimeoutMs Socket timeout in milliseconds + * @param authToken auth token * @return Response * @throws IOException * @throws HttpErrorStatusException */ - public SimpleHttpResponse endReplaceSegments(URI uri, int socketTimeoutMs) + public SimpleHttpResponse endReplaceSegments(URI uri, int socketTimeoutMs, @Nullable String authToken) throws IOException, HttpErrorStatusException { - return sendRequest(getEndReplaceSegmentsRequest(uri, socketTimeoutMs)); + return sendRequest(getEndReplaceSegmentsRequest(uri, socketTimeoutMs, authToken)); } /** diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java index cc9370d..8860728 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java @@ -176,7 +176,7 @@ public abstract class BaseMultipleSegmentsConversionExecutor extends BaseTaskExe List<String> segmentsTo = segmentConversionResults.stream().map(SegmentConversionResult::getSegmentName).collect(Collectors.toList()); lineageEntryId = SegmentConversionUtils.startSegmentReplace(tableNameWithType, uploadURL, - new StartReplaceSegmentsRequest(segmentsFrom, segmentsTo)); + new StartReplaceSegmentsRequest(segmentsFrom, segmentsTo), authToken); } // Upload the tarred segments @@ -213,9 +213,8 @@ public abstract class BaseMultipleSegmentsConversionExecutor extends BaseTaskExe // Update the segment lineage to indicate that the segment replacement is done. if (replaceSegmentsEnabled) { - SegmentConversionUtils - .endSegmentReplace(tableNameWithType, uploadURL, lineageEntryId, - _minionConf.getEndReplaceSegmentsTimeoutMs()); + SegmentConversionUtils.endSegmentReplace(tableNameWithType, uploadURL, lineageEntryId, + _minionConf.getEndReplaceSegmentsTimeoutMs(), authToken); } String outputSegmentNames = segmentConversionResults.stream().map(SegmentConversionResult::getSegmentName) diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/SegmentConversionUtils.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/SegmentConversionUtils.java index 697dedb..a032d44 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/SegmentConversionUtils.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/SegmentConversionUtils.java @@ -23,6 +23,7 @@ import java.io.File; import java.net.URI; import java.util.List; import java.util.Map; +import javax.annotation.Nullable; import javax.net.ssl.SSLContext; import org.apache.http.Header; import org.apache.http.HttpHeaders; @@ -122,7 +123,7 @@ public class SegmentConversionUtils { } public static String startSegmentReplace(String tableNameWithType, String uploadURL, - StartReplaceSegmentsRequest startReplaceSegmentsRequest) + StartReplaceSegmentsRequest startReplaceSegmentsRequest, @Nullable String authToken) throws Exception { String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType); TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType); @@ -130,7 +131,8 @@ public class SegmentConversionUtils { try (FileUploadDownloadClient fileUploadDownloadClient = new FileUploadDownloadClient(sslContext)) { URI uri = FileUploadDownloadClient.getStartReplaceSegmentsURI(new URI(uploadURL), rawTableName, tableType.name(), true); - SimpleHttpResponse response = fileUploadDownloadClient.startReplaceSegments(uri, startReplaceSegmentsRequest); + SimpleHttpResponse response = + fileUploadDownloadClient.startReplaceSegments(uri, startReplaceSegmentsRequest, authToken); String responseString = response.getResponse(); LOGGER.info( "Got response {}: {} while sending start replace segment request for table: {}, uploadURL: {}, request: {}", @@ -140,7 +142,7 @@ public class SegmentConversionUtils { } public static void endSegmentReplace(String tableNameWithType, String uploadURL, String segmentLineageEntryId, - int socketTimeoutMs) + int socketTimeoutMs, @Nullable String authToken) throws Exception { String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType); TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType); @@ -148,7 +150,7 @@ public class SegmentConversionUtils { try (FileUploadDownloadClient fileUploadDownloadClient = new FileUploadDownloadClient(sslContext)) { URI uri = FileUploadDownloadClient .getEndReplaceSegmentsURI(new URI(uploadURL), rawTableName, tableType.name(), segmentLineageEntryId); - SimpleHttpResponse response = fileUploadDownloadClient.endReplaceSegments(uri, socketTimeoutMs); + SimpleHttpResponse response = fileUploadDownloadClient.endReplaceSegments(uri, socketTimeoutMs, authToken); LOGGER.info("Got response {}: {} while sending end replace segment request for table: {}, uploadURL: {}", response.getStatusCode(), response.getResponse(), tableNameWithType, uploadURL); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org