This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 27d690d allow skip proactive cleaning segment lineages (#8240) 27d690d is described below commit 27d690dfd7d4f872a095c3376ecf1edbabee8c16 Author: Xiaobing <61892277+klsi...@users.noreply.github.com> AuthorDate: Thu Feb 24 11:50:19 2022 -0800 allow skip proactive cleaning segment lineages (#8240) Also refine the logic for lineage cleanup --- .../pinot/controller/helix/core/PinotHelixResourceManager.java | 6 +++--- .../pinot/plugin/minion/tasks/SegmentConversionUtils.java | 10 ++++++++-- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index c44a363..b28dbe69 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -2878,13 +2878,13 @@ public class PinotHelixResourceManager { // By here, the lineage entry is either 'IN_PROGRESS' or 'COMPLETED'. // When 'forceCleanup' is enabled, we need to proactively clean up at the following cases: - // 1. Revert the lineage entry when we find the lineage entry with the same 'segmentFrom' values. This is + // 1. Revert the lineage entry when we find the lineage entry with overlapped 'segmentFrom' values. This is // used to un-block the segment replacement protocol if the previous attempt failed in the middle. // 2. Proactively delete the oldest data snapshot to make sure that we only keep at most 2 data snapshots // at any time in case of REFRESH use case. if (forceCleanup) { - if (lineageEntry.getState() == LineageEntryState.IN_PROGRESS && CollectionUtils - .isEqualCollection(segmentsFrom, lineageEntry.getSegmentsFrom())) { + if (lineageEntry.getState() == LineageEntryState.IN_PROGRESS && !Collections + .disjoint(segmentsFrom, lineageEntry.getSegmentsFrom())) { LOGGER.info( "Detected the incomplete lineage entry with the same 'segmentsFrom'. Reverting the lineage " + "entry to unblock the new segment protocol. tableNameWithType={}, entryId={}, segmentsFrom={}, " diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/SegmentConversionUtils.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/SegmentConversionUtils.java index a032d44..129a7c7 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/SegmentConversionUtils.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/SegmentConversionUtils.java @@ -125,12 +125,18 @@ public class SegmentConversionUtils { public static String startSegmentReplace(String tableNameWithType, String uploadURL, StartReplaceSegmentsRequest startReplaceSegmentsRequest, @Nullable String authToken) throws Exception { + return startSegmentReplace(tableNameWithType, uploadURL, startReplaceSegmentsRequest, authToken, true); + } + + public static String startSegmentReplace(String tableNameWithType, String uploadURL, + StartReplaceSegmentsRequest startReplaceSegmentsRequest, @Nullable String authToken, boolean forceCleanup) + throws Exception { String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType); TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType); SSLContext sslContext = MinionContext.getInstance().getSSLContext(); try (FileUploadDownloadClient fileUploadDownloadClient = new FileUploadDownloadClient(sslContext)) { - URI uri = - FileUploadDownloadClient.getStartReplaceSegmentsURI(new URI(uploadURL), rawTableName, tableType.name(), true); + URI uri = FileUploadDownloadClient + .getStartReplaceSegmentsURI(new URI(uploadURL), rawTableName, tableType.name(), forceCleanup); SimpleHttpResponse response = fileUploadDownloadClient.startReplaceSegments(uri, startReplaceSegmentsRequest, authToken); String responseString = response.getResponse(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org