This is an automated email from the ASF dual-hosted git repository. snlee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new fb572bd Enhance revertReplaceSegments api so reverting entry1 in the following example is not allowed: (#8166) fb572bd is described below commit fb572bd0aba20d2b8a83320df6dd24cb0c654b30 Author: Jiapeng Tao <jia...@linkedin.com> AuthorDate: Thu Feb 10 18:33:21 2022 -0800 Enhance revertReplaceSegments api so reverting entry1 in the following example is not allowed: (#8166) entry1: {Seg_0 -> Seg1, COMPLETED} entry2: {Seg_1 -> Seg2, COMPLETED/IN_PROGRESS} --- .../helix/core/PinotHelixResourceManager.java | 21 ++++++++++++++++++++- .../helix/core/PinotHelixResourceManagerTest.java | 18 ++++++++++++++++++ 2 files changed, 38 insertions(+), 1 deletion(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index 938422c..38d47d5 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -2871,7 +2871,7 @@ public class PinotHelixResourceManager { // at any time in case of REFRESH use case. if (forceCleanup) { if (lineageEntry.getState() == LineageEntryState.IN_PROGRESS && CollectionUtils - .isEqualCollection(segmentsFrom, lineageEntry.getSegmentsFrom())) { + .isEqualCollection(segmentsFrom, lineageEntry.getSegmentsFrom())) { LOGGER.info( "Detected the incomplete lineage entry with the same 'segmentsFrom'. Reverting the lineage " + "entry to unblock the new segment protocol. tableNameWithType={}, entryId={}, segmentsFrom={}, " @@ -3078,6 +3078,25 @@ public class PinotHelixResourceManager { throw new RuntimeException(errorMsg); } + // We do not allow to revert the lineage entry which segments in 'segmentsTo' appear in 'segmentsFrom' of other + // 'IN_PROGRESS' or 'COMPLETED' entries. E.g. we do not allow reverting entry1 because it will block reverting + // entry2. + // entry1: {(Seg_0, Seg_1, Seg_2) -> (Seg_3, Seg_4, Seg_5), COMPLETED} + // entry2: {(Seg_3, Seg_4, Seg_5) -> (Seg_6, Seg_7, Seg_8), IN_PROGRESS/COMPLETED} + // TODO: need to expand the logic to revert multiple entries in one go when we support > 2 data snapshots + for (String currentEntryId : segmentLineage.getLineageEntryIds()) { + LineageEntry currentLineageEntry = segmentLineage.getLineageEntry(currentEntryId); + if (currentLineageEntry.getState() == LineageEntryState.IN_PROGRESS + || currentLineageEntry.getState() == LineageEntryState.COMPLETED) { + Preconditions.checkArgument(Collections.disjoint(lineageEntry.getSegmentsTo(), currentLineageEntry + .getSegmentsFrom()), String.format("Cannot revert lineage entry, found segments from 'segmentsTo' " + + "appear in 'segmentsFrom' of another lineage entry. (tableNameWithType='%s', " + + "segmentLineageEntryId='%s', segmentsTo = '%s', segmentLineageEntryId='%s' " + + "segmentsFrom = '%s')", tableNameWithType, segmentLineageEntryId, lineageEntry.getSegmentsTo(), + currentEntryId, currentLineageEntry.getSegmentsFrom())); + } + } + // Update segment lineage entry to 'REVERTED' updateSegmentLineageEntryToReverted(tableNameWithType, segmentLineage, segmentLineageEntryId, lineageEntry); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java index f41f26e..dac5447 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java @@ -75,6 +75,7 @@ import static org.apache.pinot.spi.utils.CommonConstants.Helix.LEAD_CONTROLLER_R import static org.apache.pinot.spi.utils.CommonConstants.Helix.NUMBER_OF_PARTITIONS_IN_LEAD_CONTROLLER_RESOURCE; import static org.apache.pinot.spi.utils.CommonConstants.Helix.UNTAGGED_SERVER_INSTANCE; import static org.apache.pinot.spi.utils.CommonConstants.Server.DEFAULT_ADMIN_API_PORT; +import static org.testng.Assert.fail; public class PinotHelixResourceManagerTest { @@ -491,6 +492,7 @@ public class PinotHelixResourceManagerTest { try { ControllerTestUtils.getHelixResourceManager() .startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, segmentsFrom, segmentsTo, false); + fail(); } catch (Exception e) { // expected } @@ -499,6 +501,7 @@ public class PinotHelixResourceManagerTest { try { ControllerTestUtils.getHelixResourceManager() .startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, segmentsFrom, segmentsTo, false); + fail(); } catch (Exception e) { // expected } @@ -508,6 +511,7 @@ public class PinotHelixResourceManagerTest { try { ControllerTestUtils.getHelixResourceManager() .startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, segmentsFrom, segmentsTo, false); + fail(); } catch (Exception e) { // expected } @@ -515,6 +519,7 @@ public class PinotHelixResourceManagerTest { // Invalid table try { ControllerTestUtils.getHelixResourceManager().endReplaceSegments(OFFLINE_TABLE_NAME, lineageEntryId); + fail(); } catch (Exception e) { // expected } @@ -522,6 +527,7 @@ public class PinotHelixResourceManagerTest { // Invalid lineage entry id try { ControllerTestUtils.getHelixResourceManager().endReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, "aaa"); + fail(); } catch (Exception e) { // expected } @@ -530,6 +536,7 @@ public class PinotHelixResourceManagerTest { try { ControllerTestUtils.getHelixResourceManager() .endReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, lineageEntryId); + fail(); } catch (Exception e) { // expected } @@ -575,6 +582,7 @@ public class PinotHelixResourceManagerTest { try { ControllerTestUtils.getHelixResourceManager() .revertReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, lineageEntryId2, false); + fail(); } catch (Exception e) { // expected } @@ -613,6 +621,7 @@ public class PinotHelixResourceManagerTest { try { ControllerTestUtils.getHelixResourceManager() .startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, segmentsFrom, segmentsTo, false); + fail(); } catch (Exception e) { // expected } @@ -755,6 +764,14 @@ public class PinotHelixResourceManagerTest { Assert.assertEquals(new HashSet<>(ControllerTestUtils.getHelixResourceManager() .getSegmentsFor(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME, true)), new HashSet<>(Arrays.asList("s3", "s4", "s5"))); + // Try to revert the first entry should fail + try { + ControllerTestUtils.getHelixResourceManager() + .revertReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME, lineageEntryId, false); + fail(); + } catch (Exception e) { + // expected + } // Add partial segments to indicate incomplete protocol ControllerTestUtils.getHelixResourceManager().addNewSegment(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME, @@ -799,6 +816,7 @@ public class PinotHelixResourceManagerTest { try { ControllerTestUtils.getHelixResourceManager() .endReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME, lineageEntryId2); + fail(); } catch (Exception e) { // expected } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org