This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch 
add-revertReplaceSegments-in-FileUploadDownloadClient
in repository https://gitbox.apache.org/repos/asf/pinot.git

commit 4475d9f9d27cd0b693de600fa9552194d0f693f0
Author: Jack Li(Analytics Engineering) <j...@jlli-mn1.linkedin.biz>
AuthorDate: Thu Nov 18 14:45:37 2021 -0800

    Add getRevertReplaceSegmentRequest method in FileUploadDownloadClient
---
 .../common/utils/FileUploadDownloadClient.java     | 37 ++++++++++++++++++++--
 .../PinotSegmentUploadDownloadRestletResource.java |  3 +-
 .../minion/tasks/SegmentConversionUtils.java       | 12 ++++---
 3 files changed, 44 insertions(+), 8 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
index e209b9c..5e506d4 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
@@ -123,7 +123,10 @@ public class FileUploadDownloadClient implements Closeable 
{
   private static final String TYPE_DELIMITER = "type=";
   private static final String START_REPLACE_SEGMENTS_PATH = 
"/startReplaceSegments";
   private static final String END_REPLACE_SEGMENTS_PATH = 
"/endReplaceSegments";
+  private static final String REVERT_REPLACE_SEGMENTS_PATH = 
"/revertReplaceSegments";
   private static final String SEGMENT_LINEAGE_ENTRY_ID_PARAMETER = 
"&segmentLineageEntryId=";
+  private static final String FORCE_REVERT_PARAMETER = "&forceRevert=";
+  private static final String FORCE_CLEANUP_PARAMETER = "&forceCleanup=";
   private static final String JSON_CONTENT_TYPE = "application/json";
 
   private static final List<String> SUPPORTED_PROTOCOLS = Arrays.asList(HTTP, 
HTTPS);
@@ -317,10 +320,12 @@ public class FileUploadDownloadClient implements 
Closeable {
     return getURI(controllerURI.getScheme(), controllerURI.getHost(), 
controllerURI.getPort(), SEGMENT_PATH);
   }
 
-  public static URI getStartReplaceSegmentsURI(URI controllerURI, String 
rawTableName, String tableType)
+  public static URI getStartReplaceSegmentsURI(URI controllerURI, String 
rawTableName, String tableType,
+      boolean forceCleanup)
       throws URISyntaxException {
     return getURI(controllerURI.getScheme(), controllerURI.getHost(), 
controllerURI.getPort(),
-        OLD_SEGMENT_PATH + "/" + rawTableName + START_REPLACE_SEGMENTS_PATH, 
TYPE_DELIMITER + tableType);
+        OLD_SEGMENT_PATH + "/" + rawTableName + START_REPLACE_SEGMENTS_PATH,
+        TYPE_DELIMITER + tableType + FORCE_CLEANUP_PARAMETER + forceCleanup);
   }
 
   public static URI getEndReplaceSegmentsURI(URI controllerURI, String 
rawTableName, String tableType,
@@ -331,6 +336,15 @@ public class FileUploadDownloadClient implements Closeable 
{
         TYPE_DELIMITER + tableType + SEGMENT_LINEAGE_ENTRY_ID_PARAMETER + 
segmentLineageEntryId);
   }
 
+  public static URI getRevertReplaceSegmentsURI(URI controllerURI, String 
rawTableName, String tableType,
+      String segmentLineageEntryId, boolean forceRevert)
+      throws URISyntaxException {
+    return getURI(controllerURI.getScheme(), controllerURI.getHost(), 
controllerURI.getPort(),
+        OLD_SEGMENT_PATH + "/" + rawTableName + REVERT_REPLACE_SEGMENTS_PATH,
+        TYPE_DELIMITER + tableType + SEGMENT_LINEAGE_ENTRY_ID_PARAMETER + 
segmentLineageEntryId + FORCE_REVERT_PARAMETER
+            + forceRevert);
+  }
+
   private static HttpUriRequest getUploadFileRequest(String method, URI uri, 
ContentBody contentBody,
       @Nullable List<Header> headers, @Nullable List<NameValuePair> 
parameters, int socketTimeoutMs) {
     // Build the Http entity
@@ -435,6 +449,12 @@ public class FileUploadDownloadClient implements Closeable 
{
     return requestBuilder.build();
   }
 
+  private static HttpUriRequest getRevertReplaceSegmentRequest(URI uri) {
+    RequestBuilder requestBuilder = 
RequestBuilder.post(uri).setVersion(HttpVersion.HTTP_1_1)
+        .setHeader(HttpHeaders.CONTENT_TYPE, JSON_CONTENT_TYPE);
+    return requestBuilder.build();
+  }
+
   private static HttpUriRequest getSegmentCompletionProtocolRequest(URI uri, 
@Nullable List<Header> headers,
       @Nullable List<NameValuePair> parameters, int socketTimeoutMs) {
     RequestBuilder requestBuilder = 
RequestBuilder.get(uri).setVersion(HttpVersion.HTTP_1_1);
@@ -965,6 +985,19 @@ public class FileUploadDownloadClient implements Closeable 
{
   }
 
   /**
+   * Revert replace segments with default settings.
+   *
+   * @param uri URI
+   * @return Response
+   * @throws IOException
+   * @throws HttpErrorStatusException
+   */
+  public SimpleHttpResponse revertReplaceSegments(URI uri)
+      throws IOException, HttpErrorStatusException {
+    return sendRequest(getRevertReplaceSegmentRequest(uri));
+  }
+
+  /**
    * Deprecated due to lack of auth header support. May break for deployments 
with auth enabled
    *
    * Send segment completion protocol request.
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
index 1a75bfb..0c3b01d 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
@@ -546,7 +546,8 @@ public class PinotSegmentUploadDownloadRestletResource {
       @ApiParam(value = "Name of the table", required = true) 
@PathParam("tableName") String tableName,
       @ApiParam(value = "OFFLINE|REALTIME", required = true) 
@QueryParam("type") String tableTypeStr,
       @ApiParam(value = "Force cleanup") @QueryParam("forceCleanup") 
@DefaultValue("false") boolean forceCleanup,
-      StartReplaceSegmentsRequest startReplaceSegmentsRequest) {
+      @ApiParam(value = "Fields belonging to start replace segment request", 
required = true)
+          StartReplaceSegmentsRequest startReplaceSegmentsRequest) {
     try {
       TableType tableType = Constants.validateTableType(tableTypeStr);
       if (tableType == null) {
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/SegmentConversionUtils.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/SegmentConversionUtils.java
index 6c60aed..697dedb 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/SegmentConversionUtils.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/SegmentConversionUtils.java
@@ -128,11 +128,13 @@ public class SegmentConversionUtils {
     TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
     SSLContext sslContext = MinionContext.getInstance().getSSLContext();
     try (FileUploadDownloadClient fileUploadDownloadClient = new 
FileUploadDownloadClient(sslContext)) {
-      URI uri = FileUploadDownloadClient.getStartReplaceSegmentsURI(new 
URI(uploadURL), rawTableName, tableType.name());
+      URI uri =
+          FileUploadDownloadClient.getStartReplaceSegmentsURI(new 
URI(uploadURL), rawTableName, tableType.name(), true);
       SimpleHttpResponse response = 
fileUploadDownloadClient.startReplaceSegments(uri, startReplaceSegmentsRequest);
       String responseString = response.getResponse();
-      LOGGER.info("Got response {}: {} while uploading table: {}, uploadURL: 
{}, request: {}", response.getStatusCode(),
-          responseString, tableNameWithType, uploadURL, 
startReplaceSegmentsRequest);
+      LOGGER.info(
+          "Got response {}: {} while sending start replace segment request for 
table: {}, uploadURL: {}, request: {}",
+          response.getStatusCode(), responseString, tableNameWithType, 
uploadURL, startReplaceSegmentsRequest);
       return 
JsonUtils.stringToJsonNode(responseString).get("segmentLineageEntryId").asText();
     }
   }
@@ -147,8 +149,8 @@ public class SegmentConversionUtils {
       URI uri = FileUploadDownloadClient
           .getEndReplaceSegmentsURI(new URI(uploadURL), rawTableName, 
tableType.name(), segmentLineageEntryId);
       SimpleHttpResponse response = 
fileUploadDownloadClient.endReplaceSegments(uri, socketTimeoutMs);
-      LOGGER.info("Got response {}: {} while uploading table: {}, uploadURL: 
{}", response.getStatusCode(),
-          response.getResponse(), tableNameWithType, uploadURL);
+      LOGGER.info("Got response {}: {} while sending end replace segment 
request for table: {}, uploadURL: {}",
+          response.getStatusCode(), response.getResponse(), tableNameWithType, 
uploadURL);
     }
   }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to