This is an automated email from the ASF dual-hosted git repository. snlee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 9bc1e059fb Allow rerunning startReplaceSegment with the same segmentsTo if the previous protocol fails in the middle (lineage entry in IN_PROGRESS status). (#8639) 9bc1e059fb is described below commit 9bc1e059fb5974a8c49523d4e9be1bd47bd88181 Author: Jiapeng Tao <jia...@linkedin.com> AuthorDate: Wed May 4 13:43:30 2022 -0700 Allow rerunning startReplaceSegment with the same segmentsTo if the previous protocol fails in the middle (lineage entry in IN_PROGRESS status). (#8639) --- .../helix/core/PinotHelixResourceManager.java | 18 +++---- .../helix/core/PinotHelixResourceManagerTest.java | 56 +++++++++++++--------- 2 files changed, 44 insertions(+), 30 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index f32e2e75e9..0e4d25c9ab 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -3010,7 +3010,7 @@ public class PinotHelixResourceManager { String segmentLineageEntryId = SegmentLineageUtils.generateLineageEntryId(); // Check that all the segments from 'segmentsFrom' exist in the table - Set<String> segmentsForTable = new HashSet<>(getSegmentsFor(tableNameWithType, false)); + Set<String> segmentsForTable = new HashSet<>(getSegmentsFor(tableNameWithType, true)); Preconditions.checkArgument(segmentsForTable.containsAll(segmentsFrom), String.format( "Not all segments from 'segmentsFrom' are available in the table. (tableName = '%s', segmentsFrom = '%s', " + "segmentsTo = '%s', segmentsFromTable = '%s')", tableNameWithType, segmentsFrom, segmentsTo, @@ -3060,17 +3060,19 @@ public class PinotHelixResourceManager { // By here, the lineage entry is either 'IN_PROGRESS' or 'COMPLETED'. // When 'forceCleanup' is enabled, we need to proactively clean up at the following cases: - // 1. Revert the lineage entry when we find the lineage entry with overlapped 'segmentFrom' values. This is - // used to un-block the segment replacement protocol if the previous attempt failed in the middle. + // 1. Revert the lineage entry when we find the lineage entry with overlapped 'segmentsFrom' or 'segmentsTo' + // values. This is used to un-block the segment replacement protocol if the previous attempt failed in the + // middle. // 2. Proactively delete the oldest data snapshot to make sure that we only keep at most 2 data snapshots // at any time in case of REFRESH use case. if (forceCleanup) { - if (lineageEntry.getState() == LineageEntryState.IN_PROGRESS && !Collections - .disjoint(segmentsFrom, lineageEntry.getSegmentsFrom())) { + if (lineageEntry.getState() == LineageEntryState.IN_PROGRESS && (!Collections + .disjoint(segmentsFrom, lineageEntry.getSegmentsFrom()) || !Collections + .disjoint(segmentsTo, lineageEntry.getSegmentsTo()))) { LOGGER.info( - "Detected the incomplete lineage entry with the same 'segmentsFrom'. Reverting the lineage " - + "entry to unblock the new segment protocol. tableNameWithType={}, entryId={}, segmentsFrom={}, " - + "segmentsTo={}", tableNameWithType, entryId, lineageEntry.getSegmentsFrom(), + "Detected the incomplete lineage entry with the same 'segmentsFrom' or 'segmentsTo'. Reverting the " + + "lineage entry to unblock the new segment protocol. tableNameWithType={}, entryId={}, " + + "segmentsFrom={}, segmentsTo={}", tableNameWithType, entryId, lineageEntry.getSegmentsFrom(), lineageEntry.getSegmentsTo()); // Update segment lineage entry to 'REVERTED' diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java index 996f5b521c..8d4c4c2489 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java @@ -691,6 +691,18 @@ public class PinotHelixResourceManagerTest { Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId5).getSegmentsTo(), segmentsTo); Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId5).getState(), LineageEntryState.IN_PROGRESS); + // Assuming the replacement fails in the middle, rerunning the protocol with the same segmentsTo will go through, + // and revert the previous lineage entry. + String lineageEntryId6 = ControllerTestUtils.getHelixResourceManager() + .startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, segmentsFrom, segmentsTo, true); + segmentLineage = SegmentLineageAccessHelper + .getSegmentLineage(ControllerTestUtils.getPropertyStore(), OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME); + Assert.assertEquals(segmentLineage.getLineageEntryIds().size(), 6); + Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId5).getState(), LineageEntryState.REVERTED); + Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId6).getSegmentsFrom(), segmentsFrom); + Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId6).getSegmentsTo(), segmentsTo); + Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId6).getState(), LineageEntryState.IN_PROGRESS); + // Upload partial data ControllerTestUtils.getHelixResourceManager().addNewSegment(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, "s7"), "downloadUrl"); @@ -698,12 +710,12 @@ public class PinotHelixResourceManagerTest { // Start another new segment replacement with empty segmentsFrom, // and check that previous lineages with empty segmentsFrom are not reverted. segmentsTo = Arrays.asList("s9", "s10"); - String lineageEntryId6 = ControllerTestUtils.getHelixResourceManager() + String lineageEntryId7 = ControllerTestUtils.getHelixResourceManager() .startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, segmentsFrom, segmentsTo, true); segmentLineage = SegmentLineageAccessHelper .getSegmentLineage(ControllerTestUtils.getPropertyStore(), OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME); - Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId5).getState(), LineageEntryState.IN_PROGRESS); Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId6).getState(), LineageEntryState.IN_PROGRESS); + Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId7).getState(), LineageEntryState.IN_PROGRESS); Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId).getState(), LineageEntryState.COMPLETED); // Finish the replacement @@ -713,26 +725,26 @@ public class PinotHelixResourceManagerTest { SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, "s10"), "downloadUrl"); ControllerTestUtils.getHelixResourceManager() - .endReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, lineageEntryId6); + .endReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, lineageEntryId7); segmentLineage = SegmentLineageAccessHelper .getSegmentLineage(ControllerTestUtils.getPropertyStore(), OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME); - Assert.assertEquals(segmentLineage.getLineageEntryIds().size(), 6); - Assert.assertTrue(segmentLineage.getLineageEntry(lineageEntryId6).getSegmentsFrom().isEmpty()); - Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId6).getSegmentsTo(), Arrays.asList("s9", "s10")); - Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId6).getState(), LineageEntryState.COMPLETED); + Assert.assertEquals(segmentLineage.getLineageEntryIds().size(), 7); + Assert.assertTrue(segmentLineage.getLineageEntry(lineageEntryId7).getSegmentsFrom().isEmpty()); + Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId7).getSegmentsTo(), Arrays.asList("s9", "s10")); + Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId7).getState(), LineageEntryState.COMPLETED); // Check partial overlap reverts previous lineage // Start a new segment replacement with non-empty segmentsFrom. segmentsFrom = Arrays.asList("s9", "s10"); segmentsTo = Arrays.asList("s11", "s12"); - String lineageEntryId7 = ControllerTestUtils.getHelixResourceManager() + String lineageEntryId8 = ControllerTestUtils.getHelixResourceManager() .startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, segmentsFrom, segmentsTo, false); segmentLineage = SegmentLineageAccessHelper .getSegmentLineage(ControllerTestUtils.getPropertyStore(), OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME); - Assert.assertEquals(segmentLineage.getLineageEntryIds().size(), 7); - Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId7).getSegmentsFrom(), segmentsFrom); - Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId7).getSegmentsTo(), segmentsTo); - Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId7).getState(), LineageEntryState.IN_PROGRESS); + Assert.assertEquals(segmentLineage.getLineageEntryIds().size(), 8); + Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId8).getSegmentsFrom(), segmentsFrom); + Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId8).getSegmentsTo(), segmentsTo); + Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId8).getState(), LineageEntryState.IN_PROGRESS); // Upload partial data ControllerTestUtils.getHelixResourceManager().addNewSegment(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, @@ -742,12 +754,12 @@ public class PinotHelixResourceManagerTest { // and check that previous lineages with overlapped segmentsFrom are reverted. segmentsFrom = Arrays.asList("s0", "s9"); segmentsTo = Arrays.asList("s13", "s14"); - String lineageEntryId8 = ControllerTestUtils.getHelixResourceManager() + String lineageEntryId9 = ControllerTestUtils.getHelixResourceManager() .startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, segmentsFrom, segmentsTo, true); segmentLineage = SegmentLineageAccessHelper .getSegmentLineage(ControllerTestUtils.getPropertyStore(), OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME); - Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId7).getState(), LineageEntryState.REVERTED); - Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId8).getState(), LineageEntryState.IN_PROGRESS); + Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId8).getState(), LineageEntryState.REVERTED); + Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId9).getState(), LineageEntryState.IN_PROGRESS); // Finish the replacement ControllerTestUtils.getHelixResourceManager().addNewSegment(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, @@ -756,18 +768,18 @@ public class PinotHelixResourceManagerTest { SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, "s14"), "downloadUrl"); ControllerTestUtils.getHelixResourceManager() - .endReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, lineageEntryId8); + .endReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, lineageEntryId9); segmentLineage = SegmentLineageAccessHelper .getSegmentLineage(ControllerTestUtils.getPropertyStore(), OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME); - Assert.assertEquals(segmentLineage.getLineageEntryIds().size(), 8); - Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId8).getSegmentsFrom(), Arrays.asList("s0", "s9")); - Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId8).getSegmentsTo(), Arrays.asList("s13", "s14")); - Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId8).getState(), LineageEntryState.COMPLETED); + Assert.assertEquals(segmentLineage.getLineageEntryIds().size(), 9); + Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId9).getSegmentsFrom(), Arrays.asList("s0", "s9")); + Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId9).getSegmentsTo(), Arrays.asList("s13", "s14")); + Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId9).getState(), LineageEntryState.COMPLETED); // Check endReplaceSegments is idempotent ControllerTestUtils.getHelixResourceManager() - .endReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, lineageEntryId8); - Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId8).getState(), LineageEntryState.COMPLETED); + .endReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, lineageEntryId9); + Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId9).getState(), LineageEntryState.COMPLETED); } private void testSegmentReplacementForRefresh() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org