This is an automated email from the ASF dual-hosted git repository. snlee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new e3d238a Add forceCleanup option for 'startReplaceSegments' API (#7744) e3d238a is described below commit e3d238ac1d8633331d9507713266e41e6b40f870 Author: Seunghyun Lee <sn...@linkedin.com> AuthorDate: Thu Nov 11 18:52:49 2021 -0800 Add forceCleanup option for 'startReplaceSegments' API (#7744) --- .../PinotSegmentUploadDownloadRestletResource.java | 3 +- .../helix/core/PinotHelixResourceManager.java | 133 ++++++++++++-------- .../helix/core/PinotHelixResourceManagerTest.java | 134 ++++++++++++++++----- 3 files changed, 189 insertions(+), 81 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java index 78de2fd..1a75bfb 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java @@ -545,6 +545,7 @@ public class PinotSegmentUploadDownloadRestletResource { public Response startReplaceSegments( @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName, @ApiParam(value = "OFFLINE|REALTIME", required = true) @QueryParam("type") String tableTypeStr, + @ApiParam(value = "Force cleanup") @QueryParam("forceCleanup") @DefaultValue("false") boolean forceCleanup, StartReplaceSegmentsRequest startReplaceSegmentsRequest) { try { TableType tableType = Constants.validateTableType(tableTypeStr); @@ -555,7 +556,7 @@ public class PinotSegmentUploadDownloadRestletResource { String tableNameWithType = TableNameBuilder.forType(tableType).tableNameWithType(tableName); String segmentLineageEntryId = _pinotHelixResourceManager .startReplaceSegments(tableNameWithType, startReplaceSegmentsRequest.getSegmentsFrom(), - startReplaceSegmentsRequest.getSegmentsTo()); + startReplaceSegmentsRequest.getSegmentsTo(), forceCleanup); return Response.ok(JsonUtils.newObjectNode().put("segmentLineageEntryId", segmentLineageEntryId)).build(); } catch (Exception e) { throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR, e); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index 22b6872..e32506b 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -2747,11 +2747,13 @@ public class PinotHelixResourceManager { * @param tableNameWithType Table name with type * @param segmentsFrom a list of segments to be merged * @param segmentsTo a list of merged segments + * @param forceCleanup True for enabling the force segment cleanup * @return Segment lineage entry id * * @throws InvalidConfigException */ - public String startReplaceSegments(String tableNameWithType, List<String> segmentsFrom, List<String> segmentsTo) { + public String startReplaceSegments(String tableNameWithType, List<String> segmentsFrom, List<String> segmentsTo, + boolean forceCleanup) { // Create a segment lineage entry id String segmentLineageEntryId = SegmentLineageUtils.generateLineageEntryId(); @@ -2786,37 +2788,61 @@ public class PinotHelixResourceManager { Preconditions.checkArgument(segmentLineage.getLineageEntry(segmentLineageEntryId) == null, String.format("SegmentLineageEntryId (%s) already exists in the segment lineage.", segmentLineageEntryId)); + List<String> segmentsToCleanUp = new ArrayList<>(); for (String entryId : segmentLineage.getLineageEntryIds()) { LineageEntry lineageEntry = segmentLineage.getLineageEntry(entryId); - // If segment entry is in 'REVERTED' state, no need to check for 'segmentsFrom'. - if (lineageEntry.getState() != LineageEntryState.REVERTED) { + // If the lineage entry is in 'REVERTED' state, no need to go through the validation because we can regard + // the entry as not existing. + if (lineageEntry.getState() == LineageEntryState.REVERTED) { + continue; + } + + // By here, the lineage entry is either 'IN_PROGRESS' or 'COMPLETED'. + + // When 'forceCleanup' is enabled, we need to proactively revert the lineage entry when we find the lineage + // entry with the same 'segmentFrom' values. + if (forceCleanup && lineageEntry.getState() == LineageEntryState.IN_PROGRESS && CollectionUtils + .isEqualCollection(segmentsFrom, lineageEntry.getSegmentsFrom())) { + // Update segment lineage entry to 'REVERTED' + updateSegmentLineageEntryToReverted(tableNameWithType, segmentLineage, entryId, lineageEntry); + + // Add segments for proactive clean-up. + segmentsToCleanUp.addAll(lineageEntry.getSegmentsTo()); + } else { // Check that any segment from 'segmentsFrom' does not appear twice. Preconditions.checkArgument(Collections.disjoint(lineageEntry.getSegmentsFrom(), segmentsFrom), String - .format( - "It is not allowed to merge segments that are already merged. (tableName = %s, segmentsFrom from " - + "existing lineage entry = %s, requested segmentsFrom = %s)", tableNameWithType, - lineageEntry.getSegmentsFrom(), segmentsFrom)); + .format("It is not allowed to replace segments that are already replaced. (tableName = %s, " + + "segmentsFrom from the existing lineage entry = %s, requested segmentsFrom = %s)", + tableNameWithType, lineageEntry.getSegmentsFrom(), segmentsFrom)); + + // Check that any segment from 'segmentTo' does not appear twice. + Preconditions.checkArgument(Collections.disjoint(lineageEntry.getSegmentsTo(), segmentsTo), String.format( + "It is not allowed to have the same segment name for segments in 'segmentsTo'. (tableName = %s, " + + "segmentsTo from the existing lineage entry = %s, requested segmentsTo = %s)", tableNameWithType, + lineageEntry.getSegmentsTo(), segmentsTo)); } - - // Check that merged segments name cannot be the same. - Preconditions.checkArgument(Collections.disjoint(lineageEntry.getSegmentsTo(), segmentsTo), String.format( - "It is not allowed to have the same segment name for merged segments. (tableName = %s, segmentsTo from " - + "existing lineage entry = %s, requested segmentsTo = %s)", tableNameWithType, - lineageEntry.getSegmentsTo(), segmentsTo)); } // Update lineage entry segmentLineage.addLineageEntry(segmentLineageEntryId, new LineageEntry(segmentsFrom, segmentsTo, LineageEntryState.IN_PROGRESS, System.currentTimeMillis())); - // Write back to the lineage entry - return SegmentLineageAccessHelper.writeSegmentLineage(_propertyStore, segmentLineage, expectedVersion); + // Write back to the lineage entry to the property store + if (SegmentLineageAccessHelper.writeSegmentLineage(_propertyStore, segmentLineage, expectedVersion)) { + // Trigger the proactive segment clean up if needed. Once the lineage is updated in the property store, it + // is safe to physically delete segments. + if (!segmentsToCleanUp.isEmpty()) { + deleteSegments(tableNameWithType, segmentsToCleanUp); + } + return true; + } else { + return false; + } }); } catch (Exception e) { - String errorMsg = String - .format("Failed while updating the segment lineage. (tableName = %s, segmentsFrom = %s, segmentsTo = %s)", - tableNameWithType, segmentsFrom, segmentsTo); + String errorMsg = String.format("Failed to update the segment lineage during startReplaceSegments. " + + "(tableName = %s, segmentsFrom = %s, segmentsTo = %s)", tableNameWithType, segmentsFrom, segmentsTo); LOGGER.error(errorMsg, e); throw new RuntimeException(errorMsg, e); } @@ -2860,10 +2886,11 @@ public class PinotHelixResourceManager { .format("Invalid segment lineage entry id (tableName='%s', segmentLineageEntryId='%s')", tableNameWithType, segmentLineageEntryId)); - // NO-OPS if the entry is already completed - if (lineageEntry.getState() == LineageEntryState.COMPLETED) { - LOGGER.warn("Lineage entry state is already COMPLETED. Nothing to update. (tableNameWithType={}, " - + "segmentLineageEntryId={})", tableNameWithType, segmentLineageEntryId); + // NO-OPS if the entry is already 'COMPLETED' or 'REVERTED' + if (lineageEntry.getState() != LineageEntryState.IN_PROGRESS) { + LOGGER.warn("Lineage entry state is not 'IN_PROGRESS'. Cannot update to 'COMPLETED'. (tableNameWithType={}, " + + "segmentLineageEntryId={}, state={})", tableNameWithType, segmentLineageEntryId, + lineageEntry.getState()); return true; } @@ -2901,9 +2928,8 @@ public class PinotHelixResourceManager { } }); } catch (Exception e) { - String errorMsg = String - .format("Failed to update the segment lineage. (tableName = %s, segmentLineageEntryId = %s)", - tableNameWithType, segmentLineageEntryId); + String errorMsg = String.format("Failed to update the segment lineage during endReplaceSegments. " + + "(tableName = %s, segmentLineageEntryId = %s)", tableNameWithType, segmentLineageEntryId); LOGGER.error(errorMsg, e); throw new RuntimeException(errorMsg, e); } @@ -2943,29 +2969,19 @@ public class PinotHelixResourceManager { .format("Invalid segment lineage entry id (tableName='%s', segmentLineageEntryId='%s')", tableNameWithType, segmentLineageEntryId)); - if (lineageEntry.getState() != LineageEntryState.COMPLETED) { - // We do not allow to revert the lineage entry with 'REVERTED' state. For 'IN_PROGRESS", we only allow to - // revert when 'forceRevert' is set to true. - if (lineageEntry.getState() != LineageEntryState.IN_PROGRESS || !forceRevert) { - LOGGER.warn("Lineage state is not valid. Cannot revert the lineage entry. (tableNameWithType={}, " - + "segmentLineageEntryId={}, segmentLineageEntrySate={}, forceRevert={})", tableNameWithType, - segmentLineageEntryId, lineageEntry.getState(), forceRevert); - return false; - } + // We do not allow to revert the lineage entry with 'REVERTED' state. For 'IN_PROGRESS", we only allow to + // revert when 'forceRevert' is set to true. + if (lineageEntry.getState() == LineageEntryState.REVERTED || ( + lineageEntry.getState() == LineageEntryState.IN_PROGRESS && !forceRevert)) { + String errorMsg = String.format( + "Lineage state is not valid. Cannot update the lineage entry to be 'REVERTED'. (tableNameWithType=%s, " + + "segmentLineageEntryId=%s, segmentLineageEntryState=%s, forceRevert=%s)", tableNameWithType, + segmentLineageEntryId, lineageEntry.getState(), forceRevert); + throw new RuntimeException(errorMsg); } - // Check that all segments from 'segmentsFrom' are in ONLINE state in the external view. - Set<String> onlineSegments = getOnlineSegmentsFromExternalView(tableNameWithType); - Preconditions.checkArgument(onlineSegments.containsAll(lineageEntry.getSegmentsFrom()), String.format( - "Not all segments from 'segmentFrom' are in ONLINE state in the external view. (tableName = '%s', " - + "segmentsFrom = '%s', onlineSegments = '%s'", tableNameWithType, lineageEntry.getSegmentsFrom(), - onlineSegments)); - - // Update lineage entry - LineageEntry newLineageEntry = - new LineageEntry(lineageEntry.getSegmentsFrom(), lineageEntry.getSegmentsTo(), LineageEntryState.REVERTED, - System.currentTimeMillis()); - segmentLineage.updateLineageEntry(segmentLineageEntryId, newLineageEntry); + // Update segment lineage entry to 'REVERTED' + updateSegmentLineageEntryToReverted(tableNameWithType, segmentLineage, segmentLineageEntryId, lineageEntry); // Write back to the lineage entry if (SegmentLineageAccessHelper.writeSegmentLineage(_propertyStore, segmentLineage, expectedVersion)) { @@ -2973,15 +2989,19 @@ public class PinotHelixResourceManager { // routing table because it is possible that there has been no EV change but the routing result may be // different after updating the lineage entry. sendRoutingTableRebuildMessage(tableNameWithType); + + // Invoke the proactive clean-up for segments that we no longer needs in case 'forceRevert' is enabled + if (forceRevert) { + deleteSegments(tableNameWithType, lineageEntry.getSegmentsTo()); + } return true; } else { return false; } }); } catch (Exception e) { - String errorMsg = String - .format("Failed to update the segment lineage. (tableName = %s, segmentLineageEntryId = %s)", - tableNameWithType, segmentLineageEntryId); + String errorMsg = String.format("Failed to update the segment lineage during revertReplaceSegments. " + + "(tableName = %s, segmentLineageEntryId = %s)", tableNameWithType, segmentLineageEntryId); LOGGER.error(errorMsg, e); throw new RuntimeException(errorMsg, e); } @@ -2991,6 +3011,21 @@ public class PinotHelixResourceManager { tableNameWithType, segmentLineageEntryId); } + private void updateSegmentLineageEntryToReverted(String tableNameWithType, SegmentLineage segmentLineage, + String segmentLineageEntryId, LineageEntry lineageEntry) { + // Check that all segments from 'segmentsFrom' are in ONLINE state in the external view. + Set<String> onlineSegments = getOnlineSegmentsFromExternalView(tableNameWithType); + Preconditions.checkArgument(onlineSegments.containsAll(lineageEntry.getSegmentsFrom()), String.format( + "Failed to update the lineage to be 'REVERTED'. Not all segments from 'segmentFrom' are in ONLINE state " + + "in the external view. (tableName = '%s', segmentsFrom = '%s', onlineSegments = '%s'", tableNameWithType, + lineageEntry.getSegmentsFrom(), onlineSegments)); + + // Update lineage entry + segmentLineage.updateLineageEntry(segmentLineageEntryId, + new LineageEntry(lineageEntry.getSegmentsFrom(), lineageEntry.getSegmentsTo(), LineageEntryState.REVERTED, + System.currentTimeMillis())); + } + private void waitForSegmentsBecomeOnline(String tableNameWithType, Set<String> segmentsToCheck) throws InterruptedException, TimeoutException { long endTimeMs = System.currentTimeMillis() + EXTERNAL_VIEW_ONLINE_SEGMENTS_MAX_WAIT_MS; diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java index 89e98b0..ae524a6 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java @@ -471,7 +471,7 @@ public class PinotHelixResourceManagerTest { List<String> segmentsTo = Arrays.asList("s5", "s6"); String lineageEntryId = ControllerTestUtils.getHelixResourceManager() - .startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, segmentsFrom, segmentsTo); + .startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, segmentsFrom, segmentsTo, false); SegmentLineage segmentLineage = SegmentLineageAccessHelper .getSegmentLineage(ControllerTestUtils.getPropertyStore(), OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME); Assert.assertEquals(segmentLineage.getLineageEntryIds().size(), 1); @@ -484,7 +484,7 @@ public class PinotHelixResourceManagerTest { segmentsTo = Arrays.asList("s3", "s4"); try { ControllerTestUtils.getHelixResourceManager() - .startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, segmentsFrom, segmentsTo); + .startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, segmentsFrom, segmentsTo, false); } catch (Exception e) { // expected } @@ -492,34 +492,16 @@ public class PinotHelixResourceManagerTest { segmentsTo = Arrays.asList("s2"); try { ControllerTestUtils.getHelixResourceManager() - .startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, segmentsFrom, segmentsTo); + .startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, segmentsFrom, segmentsTo, false); } catch (Exception e) { // expected } // Check invalid segmentsFrom segmentsFrom = Arrays.asList("s1", "s6"); - segmentsTo = Arrays.asList("merged1", "merged2"); try { ControllerTestUtils.getHelixResourceManager() - .startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, segmentsFrom, segmentsTo); - } catch (Exception e) { - // expected - } - - segmentsFrom = Arrays.asList("s1", "s2"); - String lineageEntryId2 = ControllerTestUtils.getHelixResourceManager() - .startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, segmentsFrom, segmentsTo); - segmentLineage = SegmentLineageAccessHelper - .getSegmentLineage(ControllerTestUtils.getPropertyStore(), OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME); - Assert.assertEquals(segmentLineage.getLineageEntryIds().size(), 2); - Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId2).getSegmentsFrom(), segmentsFrom); - Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId2).getSegmentsTo(), segmentsTo); - Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId2).getState(), LineageEntryState.IN_PROGRESS); - - try { - ControllerTestUtils.getHelixResourceManager() - .startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, segmentsFrom, segmentsTo); + .startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, segmentsFrom, segmentsTo, false); } catch (Exception e) { // expected } @@ -556,27 +538,117 @@ public class PinotHelixResourceManagerTest { .endReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, lineageEntryId); segmentLineage = SegmentLineageAccessHelper .getSegmentLineage(ControllerTestUtils.getPropertyStore(), OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME); - Assert.assertEquals(segmentLineage.getLineageEntryIds().size(), 2); + Assert.assertEquals(segmentLineage.getLineageEntryIds().size(), 1); Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId).getSegmentsFrom(), new ArrayList<>()); Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId).getSegmentsTo(), Arrays.asList("s5", "s6")); Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId).getState(), LineageEntryState.COMPLETED); + // Start the new segment replacement + segmentsFrom = Arrays.asList("s1", "s2"); + segmentsTo = Arrays.asList("merged_t1_0", "merged_t1_1"); + String lineageEntryId2 = ControllerTestUtils.getHelixResourceManager() + .startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, segmentsFrom, segmentsTo, false); + segmentLineage = SegmentLineageAccessHelper + .getSegmentLineage(ControllerTestUtils.getPropertyStore(), OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME); + Assert.assertEquals(segmentLineage.getLineageEntryIds().size(), 2); + Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId2).getSegmentsFrom(), Arrays.asList("s1", "s2")); + Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId2).getSegmentsTo(), + Arrays.asList("merged_t1_0", "merged_t1_1")); + Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId2).getState(), LineageEntryState.IN_PROGRESS); + + // Upload partial data ControllerTestUtils.getHelixResourceManager().addNewSegment(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, - SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, "merged1"), + SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, "merged_t1_0"), + "downloadUrl"); + + IdealState idealState = + ControllerTestUtils.getHelixResourceManager().getTableIdealState(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME); + Assert.assertTrue(!idealState.getInstanceSet("merged_t1_0").isEmpty()); + + // Try to revert the entry with partial data uploaded without forceRevert + try { + ControllerTestUtils.getHelixResourceManager() + .revertReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, lineageEntryId2, false); + } catch (Exception e) { + // expected + } + + // Try to revert the entry with partial data uploaded with forceRevert + ControllerTestUtils.getHelixResourceManager() + .revertReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, lineageEntryId2, true); + segmentLineage = SegmentLineageAccessHelper + .getSegmentLineage(ControllerTestUtils.getPropertyStore(), OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME); + Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId2).getState(), LineageEntryState.REVERTED); + + // 'merged_t1_0' segment should be proactively cleaned up + idealState = + ControllerTestUtils.getHelixResourceManager().getTableIdealState(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME); + Assert.assertTrue(idealState.getInstanceSet("merged_t1_0").isEmpty()); + + // Start new segment replacement since the above entry is reverted + segmentsFrom = Arrays.asList("s1", "s2"); + segmentsTo = Arrays.asList("merged_t2_0", "merged_t2_1"); + String lineageEntryId3 = ControllerTestUtils.getHelixResourceManager() + .startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, segmentsFrom, segmentsTo, false); + segmentLineage = SegmentLineageAccessHelper + .getSegmentLineage(ControllerTestUtils.getPropertyStore(), OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME); + Assert.assertEquals(segmentLineage.getLineageEntryIds().size(), 3); + Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId3).getSegmentsFrom(), segmentsFrom); + Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId3).getSegmentsTo(), segmentsTo); + Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId3).getState(), LineageEntryState.IN_PROGRESS); + + // Upload partial data + ControllerTestUtils.getHelixResourceManager().addNewSegment(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, + SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, "merged_t2_0"), + "downloadUrl"); + + // Without force cleanup, 'startReplaceSegments' should fail because of duplicate segments on 'segmentFrom'. + segmentsTo = Arrays.asList("merged_t3_0", "merged_t3_1"); + try { + ControllerTestUtils.getHelixResourceManager() + .startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, segmentsFrom, segmentsTo, false); + } catch (Exception e) { + // expected + } + + // Test force clean up case + String lineageEntryId4 = ControllerTestUtils.getHelixResourceManager() + .startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, segmentsFrom, segmentsTo, true); + segmentLineage = SegmentLineageAccessHelper + .getSegmentLineage(ControllerTestUtils.getPropertyStore(), OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME); + Assert.assertEquals(segmentLineage.getLineageEntryIds().size(), 4); + Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId3).getSegmentsFrom(), Arrays.asList("s1", "s2")); + Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId3).getSegmentsTo(), + Arrays.asList("merged_t2_0", "merged_t2_1")); + Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId3).getState(), LineageEntryState.REVERTED); + Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId4).getSegmentsFrom(), Arrays.asList("s1", "s2")); + Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId4).getSegmentsTo(), + Arrays.asList("merged_t3_0", "merged_t3_1")); + Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId4).getState(), LineageEntryState.IN_PROGRESS); + + // 'merged_t2_0' segment should be proactively cleaned up + idealState = + ControllerTestUtils.getHelixResourceManager().getTableIdealState(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME); + Assert.assertTrue(idealState.getInstanceSet("merged_t2_0").isEmpty()); + + // Upload segments again + ControllerTestUtils.getHelixResourceManager().addNewSegment(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, + SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, "merged_t3_0"), "downloadUrl"); ControllerTestUtils.getHelixResourceManager().addNewSegment(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, - SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, "merged2"), + SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, "merged_t3_1"), "downloadUrl"); + // Finish the replacement ControllerTestUtils.getHelixResourceManager() - .endReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, lineageEntryId2); + .endReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, lineageEntryId4); segmentLineage = SegmentLineageAccessHelper .getSegmentLineage(ControllerTestUtils.getPropertyStore(), OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME); - Assert.assertEquals(segmentLineage.getLineageEntryIds().size(), 2); - Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId2).getSegmentsFrom(), Arrays.asList("s1", "s2")); - Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId2).getSegmentsTo(), - Arrays.asList("merged1", "merged2")); - Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId2).getState(), LineageEntryState.COMPLETED); + Assert.assertEquals(segmentLineage.getLineageEntryIds().size(), 4); + Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId4).getSegmentsFrom(), Arrays.asList("s1", "s2")); + Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId4).getSegmentsTo(), + Arrays.asList("merged_t3_0", "merged_t3_1")); + Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId4).getState(), LineageEntryState.COMPLETED); } @Test --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org