This is an automated email from the ASF dual-hosted git repository.

snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 9bc1e059fb Allow rerunning startReplaceSegment with the same 
segmentsTo if the previous protocol fails in the middle (lineage entry in 
IN_PROGRESS status). (#8639)
9bc1e059fb is described below

commit 9bc1e059fb5974a8c49523d4e9be1bd47bd88181
Author: Jiapeng Tao <jia...@linkedin.com>
AuthorDate: Wed May 4 13:43:30 2022 -0700

    Allow rerunning startReplaceSegment with the same segmentsTo if the 
previous protocol fails in the middle (lineage entry in IN_PROGRESS status). 
(#8639)
---
 .../helix/core/PinotHelixResourceManager.java      | 18 +++----
 .../helix/core/PinotHelixResourceManagerTest.java  | 56 +++++++++++++---------
 2 files changed, 44 insertions(+), 30 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index f32e2e75e9..0e4d25c9ab 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -3010,7 +3010,7 @@ public class PinotHelixResourceManager {
     String segmentLineageEntryId = 
SegmentLineageUtils.generateLineageEntryId();
 
     // Check that all the segments from 'segmentsFrom' exist in the table
-    Set<String> segmentsForTable = new 
HashSet<>(getSegmentsFor(tableNameWithType, false));
+    Set<String> segmentsForTable = new 
HashSet<>(getSegmentsFor(tableNameWithType, true));
     Preconditions.checkArgument(segmentsForTable.containsAll(segmentsFrom), 
String.format(
         "Not all segments from 'segmentsFrom' are available in the table. 
(tableName = '%s', segmentsFrom = '%s', "
             + "segmentsTo = '%s', segmentsFromTable = '%s')", 
tableNameWithType, segmentsFrom, segmentsTo,
@@ -3060,17 +3060,19 @@ public class PinotHelixResourceManager {
           // By here, the lineage entry is either 'IN_PROGRESS' or 'COMPLETED'.
 
           // When 'forceCleanup' is enabled, we need to proactively clean up 
at the following cases:
-          // 1. Revert the lineage entry when we find the lineage entry with 
overlapped 'segmentFrom' values. This is
-          //    used to un-block the segment replacement protocol if the 
previous attempt failed in the middle.
+          // 1. Revert the lineage entry when we find the lineage entry with 
overlapped 'segmentsFrom' or 'segmentsTo'
+          //    values. This is used to un-block the segment replacement 
protocol if the previous attempt failed in the
+          //    middle.
           // 2. Proactively delete the oldest data snapshot to make sure that 
we only keep at most 2 data snapshots
           //    at any time in case of REFRESH use case.
           if (forceCleanup) {
-            if (lineageEntry.getState() == LineageEntryState.IN_PROGRESS && 
!Collections
-                .disjoint(segmentsFrom, lineageEntry.getSegmentsFrom())) {
+            if (lineageEntry.getState() == LineageEntryState.IN_PROGRESS && 
(!Collections
+                .disjoint(segmentsFrom, lineageEntry.getSegmentsFrom()) || 
!Collections
+                .disjoint(segmentsTo, lineageEntry.getSegmentsTo()))) {
               LOGGER.info(
-                  "Detected the incomplete lineage entry with the same 
'segmentsFrom'. Reverting the lineage "
-                      + "entry to unblock the new segment protocol. 
tableNameWithType={}, entryId={}, segmentsFrom={}, "
-                      + "segmentsTo={}", tableNameWithType, entryId, 
lineageEntry.getSegmentsFrom(),
+                  "Detected the incomplete lineage entry with the same 
'segmentsFrom' or 'segmentsTo'. Reverting the "
+                      + "lineage entry to unblock the new segment protocol. 
tableNameWithType={}, entryId={}, "
+                      + "segmentsFrom={}, segmentsTo={}", tableNameWithType, 
entryId, lineageEntry.getSegmentsFrom(),
                   lineageEntry.getSegmentsTo());
 
               // Update segment lineage entry to 'REVERTED'
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
index 996f5b521c..8d4c4c2489 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
@@ -691,6 +691,18 @@ public class PinotHelixResourceManagerTest {
     
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId5).getSegmentsTo(),
 segmentsTo);
     
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId5).getState(), 
LineageEntryState.IN_PROGRESS);
 
+    // Assuming the replacement fails in the middle, rerunning the protocol 
with the same segmentsTo will go through,
+    // and revert the previous lineage entry.
+    String lineageEntryId6 = ControllerTestUtils.getHelixResourceManager()
+        .startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, 
segmentsFrom, segmentsTo, true);
+    segmentLineage = SegmentLineageAccessHelper
+        .getSegmentLineage(ControllerTestUtils.getPropertyStore(), 
OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME);
+    Assert.assertEquals(segmentLineage.getLineageEntryIds().size(), 6);
+    
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId5).getState(), 
LineageEntryState.REVERTED);
+    
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId6).getSegmentsFrom(),
 segmentsFrom);
+    
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId6).getSegmentsTo(),
 segmentsTo);
+    
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId6).getState(), 
LineageEntryState.IN_PROGRESS);
+
     // Upload partial data
     
ControllerTestUtils.getHelixResourceManager().addNewSegment(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME,
         
SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME,
 "s7"), "downloadUrl");
@@ -698,12 +710,12 @@ public class PinotHelixResourceManagerTest {
     // Start another new segment replacement with empty segmentsFrom,
     // and check that previous lineages with empty segmentsFrom are not 
reverted.
     segmentsTo = Arrays.asList("s9", "s10");
-    String lineageEntryId6 = ControllerTestUtils.getHelixResourceManager()
+    String lineageEntryId7 = ControllerTestUtils.getHelixResourceManager()
         .startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, 
segmentsFrom, segmentsTo, true);
     segmentLineage = SegmentLineageAccessHelper
         .getSegmentLineage(ControllerTestUtils.getPropertyStore(), 
OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME);
-    
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId5).getState(), 
LineageEntryState.IN_PROGRESS);
     
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId6).getState(), 
LineageEntryState.IN_PROGRESS);
+    
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId7).getState(), 
LineageEntryState.IN_PROGRESS);
     
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId).getState(), 
LineageEntryState.COMPLETED);
 
     // Finish the replacement
@@ -713,26 +725,26 @@ public class PinotHelixResourceManagerTest {
         
SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME,
 "s10"), "downloadUrl");
 
     ControllerTestUtils.getHelixResourceManager()
-        .endReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, 
lineageEntryId6);
+        .endReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, 
lineageEntryId7);
     segmentLineage = SegmentLineageAccessHelper
         .getSegmentLineage(ControllerTestUtils.getPropertyStore(), 
OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME);
-    Assert.assertEquals(segmentLineage.getLineageEntryIds().size(), 6);
-    
Assert.assertTrue(segmentLineage.getLineageEntry(lineageEntryId6).getSegmentsFrom().isEmpty());
-    
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId6).getSegmentsTo(),
 Arrays.asList("s9", "s10"));
-    
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId6).getState(), 
LineageEntryState.COMPLETED);
+    Assert.assertEquals(segmentLineage.getLineageEntryIds().size(), 7);
+    
Assert.assertTrue(segmentLineage.getLineageEntry(lineageEntryId7).getSegmentsFrom().isEmpty());
+    
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId7).getSegmentsTo(),
 Arrays.asList("s9", "s10"));
+    
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId7).getState(), 
LineageEntryState.COMPLETED);
 
     // Check partial overlap reverts previous lineage
     // Start a new segment replacement with non-empty segmentsFrom.
     segmentsFrom = Arrays.asList("s9", "s10");
     segmentsTo = Arrays.asList("s11", "s12");
-    String lineageEntryId7 = ControllerTestUtils.getHelixResourceManager()
+    String lineageEntryId8 = ControllerTestUtils.getHelixResourceManager()
         .startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, 
segmentsFrom, segmentsTo, false);
     segmentLineage = SegmentLineageAccessHelper
         .getSegmentLineage(ControllerTestUtils.getPropertyStore(), 
OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME);
-    Assert.assertEquals(segmentLineage.getLineageEntryIds().size(), 7);
-    
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId7).getSegmentsFrom(),
 segmentsFrom);
-    
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId7).getSegmentsTo(),
 segmentsTo);
-    
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId7).getState(), 
LineageEntryState.IN_PROGRESS);
+    Assert.assertEquals(segmentLineage.getLineageEntryIds().size(), 8);
+    
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId8).getSegmentsFrom(),
 segmentsFrom);
+    
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId8).getSegmentsTo(),
 segmentsTo);
+    
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId8).getState(), 
LineageEntryState.IN_PROGRESS);
 
     // Upload partial data
     
ControllerTestUtils.getHelixResourceManager().addNewSegment(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME,
@@ -742,12 +754,12 @@ public class PinotHelixResourceManagerTest {
     // and check that previous lineages with overlapped segmentsFrom are 
reverted.
     segmentsFrom = Arrays.asList("s0", "s9");
     segmentsTo = Arrays.asList("s13", "s14");
-    String lineageEntryId8 = ControllerTestUtils.getHelixResourceManager()
+    String lineageEntryId9 = ControllerTestUtils.getHelixResourceManager()
         .startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, 
segmentsFrom, segmentsTo, true);
     segmentLineage = SegmentLineageAccessHelper
         .getSegmentLineage(ControllerTestUtils.getPropertyStore(), 
OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME);
-    
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId7).getState(), 
LineageEntryState.REVERTED);
-    
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId8).getState(), 
LineageEntryState.IN_PROGRESS);
+    
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId8).getState(), 
LineageEntryState.REVERTED);
+    
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId9).getState(), 
LineageEntryState.IN_PROGRESS);
 
     // Finish the replacement
     
ControllerTestUtils.getHelixResourceManager().addNewSegment(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME,
@@ -756,18 +768,18 @@ public class PinotHelixResourceManagerTest {
         
SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME,
 "s14"), "downloadUrl");
 
     ControllerTestUtils.getHelixResourceManager()
-        .endReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, 
lineageEntryId8);
+        .endReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, 
lineageEntryId9);
     segmentLineage = SegmentLineageAccessHelper
         .getSegmentLineage(ControllerTestUtils.getPropertyStore(), 
OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME);
-    Assert.assertEquals(segmentLineage.getLineageEntryIds().size(), 8);
-    
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId8).getSegmentsFrom(),
 Arrays.asList("s0", "s9"));
-    
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId8).getSegmentsTo(),
 Arrays.asList("s13", "s14"));
-    
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId8).getState(), 
LineageEntryState.COMPLETED);
+    Assert.assertEquals(segmentLineage.getLineageEntryIds().size(), 9);
+    
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId9).getSegmentsFrom(),
 Arrays.asList("s0", "s9"));
+    
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId9).getSegmentsTo(),
 Arrays.asList("s13", "s14"));
+    
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId9).getState(), 
LineageEntryState.COMPLETED);
 
     // Check endReplaceSegments is idempotent
     ControllerTestUtils.getHelixResourceManager()
-        .endReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, 
lineageEntryId8);
-    
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId8).getState(), 
LineageEntryState.COMPLETED);
+        .endReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, 
lineageEntryId9);
+    
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId9).getState(), 
LineageEntryState.COMPLETED);
   }
 
   private void testSegmentReplacementForRefresh()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to