This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 27d690d  allow skip proactive cleaning segment lineages (#8240)
27d690d is described below

commit 27d690dfd7d4f872a095c3376ecf1edbabee8c16
Author: Xiaobing <61892277+klsi...@users.noreply.github.com>
AuthorDate: Thu Feb 24 11:50:19 2022 -0800

    allow skip proactive cleaning segment lineages (#8240)
    
    Also refine the logic for lineage cleanup
---
 .../pinot/controller/helix/core/PinotHelixResourceManager.java |  6 +++---
 .../pinot/plugin/minion/tasks/SegmentConversionUtils.java      | 10 ++++++++--
 2 files changed, 11 insertions(+), 5 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index c44a363..b28dbe69 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -2878,13 +2878,13 @@ public class PinotHelixResourceManager {
           // By here, the lineage entry is either 'IN_PROGRESS' or 'COMPLETED'.
 
           // When 'forceCleanup' is enabled, we need to proactively clean up 
at the following cases:
-          // 1. Revert the lineage entry when we find the lineage entry with 
the same 'segmentFrom' values. This is
+          // 1. Revert the lineage entry when we find the lineage entry with 
overlapped 'segmentFrom' values. This is
           //    used to un-block the segment replacement protocol if the 
previous attempt failed in the middle.
           // 2. Proactively delete the oldest data snapshot to make sure that 
we only keep at most 2 data snapshots
           //    at any time in case of REFRESH use case.
           if (forceCleanup) {
-            if (lineageEntry.getState() == LineageEntryState.IN_PROGRESS && 
CollectionUtils
-                .isEqualCollection(segmentsFrom, 
lineageEntry.getSegmentsFrom())) {
+            if (lineageEntry.getState() == LineageEntryState.IN_PROGRESS && 
!Collections
+                .disjoint(segmentsFrom, lineageEntry.getSegmentsFrom())) {
               LOGGER.info(
                   "Detected the incomplete lineage entry with the same 
'segmentsFrom'. Reverting the lineage "
                       + "entry to unblock the new segment protocol. 
tableNameWithType={}, entryId={}, segmentsFrom={}, "
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/SegmentConversionUtils.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/SegmentConversionUtils.java
index a032d44..129a7c7 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/SegmentConversionUtils.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/SegmentConversionUtils.java
@@ -125,12 +125,18 @@ public class SegmentConversionUtils {
   public static String startSegmentReplace(String tableNameWithType, String 
uploadURL,
       StartReplaceSegmentsRequest startReplaceSegmentsRequest, @Nullable 
String authToken)
       throws Exception {
+    return startSegmentReplace(tableNameWithType, uploadURL, 
startReplaceSegmentsRequest, authToken, true);
+  }
+
+  public static String startSegmentReplace(String tableNameWithType, String 
uploadURL,
+      StartReplaceSegmentsRequest startReplaceSegmentsRequest, @Nullable 
String authToken, boolean forceCleanup)
+      throws Exception {
     String rawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType);
     TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
     SSLContext sslContext = MinionContext.getInstance().getSSLContext();
     try (FileUploadDownloadClient fileUploadDownloadClient = new 
FileUploadDownloadClient(sslContext)) {
-      URI uri =
-          FileUploadDownloadClient.getStartReplaceSegmentsURI(new 
URI(uploadURL), rawTableName, tableType.name(), true);
+      URI uri = FileUploadDownloadClient
+          .getStartReplaceSegmentsURI(new URI(uploadURL), rawTableName, 
tableType.name(), forceCleanup);
       SimpleHttpResponse response =
           fileUploadDownloadClient.startReplaceSegments(uri, 
startReplaceSegmentsRequest, authToken);
       String responseString = response.getResponse();

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to