This is an automated email from the ASF dual-hosted git repository.

snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new fb572bd  Enhance revertReplaceSegments api so reverting entry1 in the 
following example is not allowed: (#8166)
fb572bd is described below

commit fb572bd0aba20d2b8a83320df6dd24cb0c654b30
Author: Jiapeng Tao <jia...@linkedin.com>
AuthorDate: Thu Feb 10 18:33:21 2022 -0800

    Enhance revertReplaceSegments api so reverting entry1 in the following 
example is not allowed: (#8166)
    
    entry1: {Seg_0 -> Seg1, COMPLETED}
    entry2: {Seg_1 -> Seg2, COMPLETED/IN_PROGRESS}
---
 .../helix/core/PinotHelixResourceManager.java       | 21 ++++++++++++++++++++-
 .../helix/core/PinotHelixResourceManagerTest.java   | 18 ++++++++++++++++++
 2 files changed, 38 insertions(+), 1 deletion(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 938422c..38d47d5 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -2871,7 +2871,7 @@ public class PinotHelixResourceManager {
           //    at any time in case of REFRESH use case.
           if (forceCleanup) {
             if (lineageEntry.getState() == LineageEntryState.IN_PROGRESS && 
CollectionUtils
-              .isEqualCollection(segmentsFrom, 
lineageEntry.getSegmentsFrom())) {
+                .isEqualCollection(segmentsFrom, 
lineageEntry.getSegmentsFrom())) {
               LOGGER.info(
                   "Detected the incomplete lineage entry with the same 
'segmentsFrom'. Reverting the lineage "
                       + "entry to unblock the new segment protocol. 
tableNameWithType={}, entryId={}, segmentsFrom={}, "
@@ -3078,6 +3078,25 @@ public class PinotHelixResourceManager {
           throw new RuntimeException(errorMsg);
         }
 
+        // We do not allow to revert the lineage entry which segments in 
'segmentsTo' appear in 'segmentsFrom' of other
+        // 'IN_PROGRESS' or 'COMPLETED' entries. E.g. we do not allow 
reverting entry1 because it will block reverting
+        // entry2.
+        // entry1: {(Seg_0, Seg_1, Seg_2) -> (Seg_3, Seg_4, Seg_5), COMPLETED}
+        // entry2: {(Seg_3, Seg_4, Seg_5) -> (Seg_6, Seg_7, Seg_8), 
IN_PROGRESS/COMPLETED}
+        // TODO: need to expand the logic to revert multiple entries in one go 
when we support > 2 data snapshots
+        for (String currentEntryId : segmentLineage.getLineageEntryIds()) {
+          LineageEntry currentLineageEntry = 
segmentLineage.getLineageEntry(currentEntryId);
+          if (currentLineageEntry.getState() == LineageEntryState.IN_PROGRESS
+              || currentLineageEntry.getState() == 
LineageEntryState.COMPLETED) {
+            
Preconditions.checkArgument(Collections.disjoint(lineageEntry.getSegmentsTo(), 
currentLineageEntry
+                .getSegmentsFrom()), String.format("Cannot revert lineage 
entry, found segments from 'segmentsTo' "
+                    + "appear in 'segmentsFrom' of another lineage entry. 
(tableNameWithType='%s', "
+                    + "segmentLineageEntryId='%s', segmentsTo = '%s', 
segmentLineageEntryId='%s' "
+                    + "segmentsFrom = '%s')", tableNameWithType, 
segmentLineageEntryId, lineageEntry.getSegmentsTo(),
+                currentEntryId, currentLineageEntry.getSegmentsFrom()));
+          }
+        }
+
         // Update segment lineage entry to 'REVERTED'
         updateSegmentLineageEntryToReverted(tableNameWithType, segmentLineage, 
segmentLineageEntryId, lineageEntry);
 
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
index f41f26e..dac5447 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
@@ -75,6 +75,7 @@ import static 
org.apache.pinot.spi.utils.CommonConstants.Helix.LEAD_CONTROLLER_R
 import static 
org.apache.pinot.spi.utils.CommonConstants.Helix.NUMBER_OF_PARTITIONS_IN_LEAD_CONTROLLER_RESOURCE;
 import static 
org.apache.pinot.spi.utils.CommonConstants.Helix.UNTAGGED_SERVER_INSTANCE;
 import static 
org.apache.pinot.spi.utils.CommonConstants.Server.DEFAULT_ADMIN_API_PORT;
+import static org.testng.Assert.fail;
 
 
 public class PinotHelixResourceManagerTest {
@@ -491,6 +492,7 @@ public class PinotHelixResourceManagerTest {
     try {
       ControllerTestUtils.getHelixResourceManager()
           .startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, 
segmentsFrom, segmentsTo, false);
+      fail();
     } catch (Exception e) {
       // expected
     }
@@ -499,6 +501,7 @@ public class PinotHelixResourceManagerTest {
     try {
       ControllerTestUtils.getHelixResourceManager()
           .startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, 
segmentsFrom, segmentsTo, false);
+      fail();
     } catch (Exception e) {
       // expected
     }
@@ -508,6 +511,7 @@ public class PinotHelixResourceManagerTest {
     try {
       ControllerTestUtils.getHelixResourceManager()
           .startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, 
segmentsFrom, segmentsTo, false);
+      fail();
     } catch (Exception e) {
       // expected
     }
@@ -515,6 +519,7 @@ public class PinotHelixResourceManagerTest {
     // Invalid table
     try {
       
ControllerTestUtils.getHelixResourceManager().endReplaceSegments(OFFLINE_TABLE_NAME,
 lineageEntryId);
+      fail();
     } catch (Exception e) {
       // expected
     }
@@ -522,6 +527,7 @@ public class PinotHelixResourceManagerTest {
     // Invalid lineage entry id
     try {
       
ControllerTestUtils.getHelixResourceManager().endReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME,
 "aaa");
+      fail();
     } catch (Exception e) {
       // expected
     }
@@ -530,6 +536,7 @@ public class PinotHelixResourceManagerTest {
     try {
       ControllerTestUtils.getHelixResourceManager()
           .endReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, 
lineageEntryId);
+      fail();
     } catch (Exception e) {
       // expected
     }
@@ -575,6 +582,7 @@ public class PinotHelixResourceManagerTest {
     try {
       ControllerTestUtils.getHelixResourceManager()
           .revertReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, 
lineageEntryId2, false);
+      fail();
     } catch (Exception e) {
       // expected
     }
@@ -613,6 +621,7 @@ public class PinotHelixResourceManagerTest {
     try {
       ControllerTestUtils.getHelixResourceManager()
           .startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, 
segmentsFrom, segmentsTo, false);
+      fail();
     } catch (Exception e) {
       // expected
     }
@@ -755,6 +764,14 @@ public class PinotHelixResourceManagerTest {
     Assert.assertEquals(new 
HashSet<>(ControllerTestUtils.getHelixResourceManager()
             .getSegmentsFor(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME, 
true)),
         new HashSet<>(Arrays.asList("s3", "s4", "s5")));
+    // Try to revert the first entry should fail
+    try {
+      ControllerTestUtils.getHelixResourceManager()
+          
.revertReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME, 
lineageEntryId, false);
+      fail();
+    } catch (Exception e) {
+      // expected
+    }
 
     // Add partial segments to indicate incomplete protocol
     
ControllerTestUtils.getHelixResourceManager().addNewSegment(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME,
@@ -799,6 +816,7 @@ public class PinotHelixResourceManagerTest {
     try {
       ControllerTestUtils.getHelixResourceManager()
           
.endReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME, 
lineageEntryId2);
+      fail();
     } catch (Exception e) {
       // expected
     }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to