xiangfu0 commented on code in PR #13735: URL: https://github.com/apache/pinot/pull/13735#discussion_r1706016936
########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java: ########## @@ -3475,167 +3486,168 @@ public String startReplaceSegments(String tableNameWithType, List<String> segmen Preconditions.checkState(!segmentsForTable.contains(segment), "Segment: %s from 'segmentsTo' exists in table: %s", segment, tableNameWithType); } + List<String> segmentsToCleanUp = new ArrayList<>(); + synchronized (getLineageUpdaterLock(tableNameWithType)) { + try { + DEFAULT_RETRY_POLICY.attempt(() -> { + // Fetch table config + TableConfig tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType); + Preconditions.checkState(tableConfig != null, "Failed to find table config for table: %s", tableNameWithType); + + // Fetch the segment lineage metadata + ZNRecord segmentLineageZNRecord = + SegmentLineageAccessHelper.getSegmentLineageZNRecord(_propertyStore, tableNameWithType); + SegmentLineage segmentLineage; + int expectedVersion; + if (segmentLineageZNRecord == null) { + segmentLineage = new SegmentLineage(tableNameWithType); + expectedVersion = -1; + } else { + segmentLineage = SegmentLineage.fromZNRecord(segmentLineageZNRecord); + expectedVersion = segmentLineageZNRecord.getVersion(); + } + // Check that the segment lineage entry id doesn't exist in the segment lineage + Preconditions.checkState(segmentLineage.getLineageEntry(segmentLineageEntryId) == null, + "Entry id: %s already exists in the segment lineage for table: %s", segmentLineageEntryId, + tableNameWithType); + + Iterator<Map.Entry<String, LineageEntry>> entryIterator = + segmentLineage.getLineageEntries().entrySet().iterator(); + while (entryIterator.hasNext()) { + Map.Entry<String, LineageEntry> entry = entryIterator.next(); + String entryId = entry.getKey(); + LineageEntry lineageEntry = entry.getValue(); + + // If the lineage entry is in 'REVERTED' state, no need to go through the validation because we can regard + // the entry as not existing. + if (lineageEntry.getState() == LineageEntryState.REVERTED) { + // When 'forceCleanup' is enabled, proactively clean up 'segmentsTo' since it's safe to do so. + if (forceCleanup) { + segmentsToCleanUp.addAll(lineageEntry.getSegmentsTo()); + } + continue; + } - try { - DEFAULT_RETRY_POLICY.attempt(() -> { - // Fetch table config - TableConfig tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType); - Preconditions.checkState(tableConfig != null, "Failed to find table config for table: %s", tableNameWithType); - - // Fetch the segment lineage metadata - ZNRecord segmentLineageZNRecord = - SegmentLineageAccessHelper.getSegmentLineageZNRecord(_propertyStore, tableNameWithType); - SegmentLineage segmentLineage; - int expectedVersion; - if (segmentLineageZNRecord == null) { - segmentLineage = new SegmentLineage(tableNameWithType); - expectedVersion = -1; - } else { - segmentLineage = SegmentLineage.fromZNRecord(segmentLineageZNRecord); - expectedVersion = segmentLineageZNRecord.getVersion(); - } - // Check that the segment lineage entry id doesn't exist in the segment lineage - Preconditions.checkState(segmentLineage.getLineageEntry(segmentLineageEntryId) == null, - "Entry id: %s already exists in the segment lineage for table: %s", segmentLineageEntryId, - tableNameWithType); + // By here, the lineage entry is either 'IN_PROGRESS' or 'COMPLETED'. - List<String> segmentsToCleanUp = new ArrayList<>(); - Iterator<Map.Entry<String, LineageEntry>> entryIterator = - segmentLineage.getLineageEntries().entrySet().iterator(); - while (entryIterator.hasNext()) { - Map.Entry<String, LineageEntry> entry = entryIterator.next(); - String entryId = entry.getKey(); - LineageEntry lineageEntry = entry.getValue(); - - // If the lineage entry is in 'REVERTED' state, no need to go through the validation because we can regard - // the entry as not existing. - if (lineageEntry.getState() == LineageEntryState.REVERTED) { - // When 'forceCleanup' is enabled, proactively clean up 'segmentsTo' since it's safe to do so. + // When 'forceCleanup' is enabled, we need to proactively clean up at the following cases: + // 1. Revert the lineage entry when we find the lineage entry with overlapped 'segmentsFrom' or 'segmentsTo' + // values. This is used to un-block the segment replacement protocol if the previous attempt failed in + // the middle. + // 2. Proactively delete the oldest data snapshot to make sure that we only keep at most 2 data snapshots + // at any time in case of REFRESH use case. if (forceCleanup) { - segmentsToCleanUp.addAll(lineageEntry.getSegmentsTo()); - } - continue; - } + if (lineageEntry.getState() == LineageEntryState.IN_PROGRESS && ( + !Collections.disjoint(segmentsFrom, lineageEntry.getSegmentsFrom()) || !Collections.disjoint( + segmentsTo, lineageEntry.getSegmentsTo()))) { + LOGGER.info( + "Detected the incomplete lineage entry with overlapped 'segmentsFrom' or 'segmentsTo'. Deleting or " + + "reverting the lineage entry to unblock the new segment protocol. tableNameWithType={}, " + + "entryId={}, segmentsFrom={}, segmentsTo={}", tableNameWithType, entryId, + lineageEntry.getSegmentsFrom(), lineageEntry.getSegmentsTo()); + + // Delete the 'IN_PROGRESS' entry or update it to 'REVERTED' + // Delete or update segmentsTo of the entry to revert to handle the case of rerunning the protocol: + // Initial state: + // Entry1: { segmentsFrom: [s1, s2], segmentsTo: [s3, s4], status: IN_PROGRESS} + // 1. Rerunning the protocol with s4 and s5, s4 should not be deleted to avoid race conditions of + // concurrent data pushes and deletions: + // Entry1: { segmentsFrom: [s1, s2], segmentsTo: [s3], status: REVERTED} + // Entry2: { segmentsFrom: [s1, s2], segmentsTo: [s4, s5], status: IN_PROGRESS} + // 2. Rerunning the protocol with s3 and s4, we can simply remove the 'IN_PROGRESS' entry: + // Entry2: { segmentsFrom: [s1, s2], segmentsTo: [s3, s4], status: IN_PROGRESS} + List<String> segmentsToForEntryToRevert = new ArrayList<>(lineageEntry.getSegmentsTo()); + segmentsToForEntryToRevert.removeAll(segmentsTo); + if (segmentsToForEntryToRevert.isEmpty()) { + // Delete 'IN_PROGRESS' entry if the segmentsTo is empty + entryIterator.remove(); + } else { + // Update the lineage entry to 'REVERTED' + entry.setValue(new LineageEntry(lineageEntry.getSegmentsFrom(), segmentsToForEntryToRevert, + LineageEntryState.REVERTED, System.currentTimeMillis())); + } - // By here, the lineage entry is either 'IN_PROGRESS' or 'COMPLETED'. - - // When 'forceCleanup' is enabled, we need to proactively clean up at the following cases: - // 1. Revert the lineage entry when we find the lineage entry with overlapped 'segmentsFrom' or 'segmentsTo' - // values. This is used to un-block the segment replacement protocol if the previous attempt failed in the - // middle. - // 2. Proactively delete the oldest data snapshot to make sure that we only keep at most 2 data snapshots - // at any time in case of REFRESH use case. - if (forceCleanup) { - if (lineageEntry.getState() == LineageEntryState.IN_PROGRESS && ( - !Collections.disjoint(segmentsFrom, lineageEntry.getSegmentsFrom()) || !Collections.disjoint(segmentsTo, - lineageEntry.getSegmentsTo()))) { - LOGGER.info( - "Detected the incomplete lineage entry with overlapped 'segmentsFrom' or 'segmentsTo'. Deleting or " - + "reverting the lineage entry to unblock the new segment protocol. tableNameWithType={}, " - + "entryId={}, segmentsFrom={}, segmentsTo={}", tableNameWithType, entryId, - lineageEntry.getSegmentsFrom(), lineageEntry.getSegmentsTo()); - - // Delete the 'IN_PROGRESS' entry or update it to 'REVERTED' - // Delete or update segmentsTo of the entry to revert to handle the case of rerunning the protocol: - // Initial state: - // Entry1: { segmentsFrom: [s1, s2], segmentsTo: [s3, s4], status: IN_PROGRESS} - // 1. Rerunning the protocol with s4 and s5, s4 should not be deleted to avoid race conditions of - // concurrent data pushes and deletions: - // Entry1: { segmentsFrom: [s1, s2], segmentsTo: [s3], status: REVERTED} - // Entry2: { segmentsFrom: [s1, s2], segmentsTo: [s4, s5], status: IN_PROGRESS} - // 2. Rerunning the protocol with s3 and s4, we can simply remove the 'IN_PROGRESS' entry: - // Entry2: { segmentsFrom: [s1, s2], segmentsTo: [s3, s4], status: IN_PROGRESS} - List<String> segmentsToForEntryToRevert = new ArrayList<>(lineageEntry.getSegmentsTo()); - segmentsToForEntryToRevert.removeAll(segmentsTo); - if (segmentsToForEntryToRevert.isEmpty()) { - // Delete 'IN_PROGRESS' entry if the segmentsTo is empty - entryIterator.remove(); - } else { - // Update the lineage entry to 'REVERTED' - entry.setValue(new LineageEntry(lineageEntry.getSegmentsFrom(), segmentsToForEntryToRevert, - LineageEntryState.REVERTED, System.currentTimeMillis())); + // Add segments for proactive clean-up. + segmentsToCleanUp.addAll(segmentsToForEntryToRevert); + } else if (lineageEntry.getState() == LineageEntryState.COMPLETED && "REFRESH".equalsIgnoreCase( + IngestionConfigUtils.getBatchSegmentIngestionType(tableConfig)) && CollectionUtils.isEqualCollection( + segmentsFrom, lineageEntry.getSegmentsTo())) { + // This part of code assumes that we only allow at most 2 data snapshots at a time by proactively + // deleting the older snapshots (for REFRESH tables). + // + // e.g. (Seg_0, Seg_1, Seg_2) -> (Seg_3, Seg_4, Seg_5) // previous lineage + // (Seg_3, Seg_4, Seg_5) -> (Seg_6, Seg_7, Seg_8) // current lineage to be updated + // -> proactively delete (Seg_0, Seg_1, Seg_2) since we want to keep 2 data snapshots + // (Seg_3, Seg_4, Seg_5), (Seg_6, Seg_7, Seg_8) only to avoid the disk space waste. + // + // TODO: make the number of allowed snapshots configurable to allow users to keep at most N snapshots + // of data. We need to traverse the lineage by N steps instead of 2 steps. We can build the + // reverse hash map (segmentsTo -> segmentsFrom) and traverse up to N times before deleting. + LOGGER.info( + "Proactively deleting the replaced segments for REFRESH table to avoid the excessive disk waste. " + + "tableNameWithType={}, segmentsToCleanUp={}", tableNameWithType, + lineageEntry.getSegmentsFrom()); + segmentsToCleanUp.addAll(lineageEntry.getSegmentsFrom()); } - - // Add segments for proactive clean-up. - segmentsToCleanUp.addAll(segmentsToForEntryToRevert); - } else if (lineageEntry.getState() == LineageEntryState.COMPLETED && "REFRESH".equalsIgnoreCase( - IngestionConfigUtils.getBatchSegmentIngestionType(tableConfig)) && CollectionUtils.isEqualCollection( - segmentsFrom, lineageEntry.getSegmentsTo())) { - // This part of code assumes that we only allow at most 2 data snapshots at a time by proactively - // deleting the older snapshots (for REFRESH tables). - // - // e.g. (Seg_0, Seg_1, Seg_2) -> (Seg_3, Seg_4, Seg_5) // previous lineage - // (Seg_3, Seg_4, Seg_5) -> (Seg_6, Seg_7, Seg_8) // current lineage to be updated - // -> proactively delete (Seg_0, Seg_1, Seg_2) since we want to keep 2 data snapshots - // (Seg_3, Seg_4, Seg_5), (Seg_6, Seg_7, Seg_8) only to avoid the disk space waste. - // - // TODO: make the number of allowed snapshots configurable to allow users to keep at most N snapshots - // of data. We need to traverse the lineage by N steps instead of 2 steps. We can build the reverse - // hash map (segmentsTo -> segmentsFrom) and traverse up to N times before deleting. - // - LOGGER.info( - "Proactively deleting the replaced segments for REFRESH table to avoid the excessive disk waste. " - + "tableNameWithType={}, segmentsToCleanUp={}", tableNameWithType, - lineageEntry.getSegmentsFrom()); - segmentsToCleanUp.addAll(lineageEntry.getSegmentsFrom()); - } - } else { - // Check that any segment from 'segmentsFrom' does not appear twice. - if (!segmentsFrom.isEmpty()) { - Set<String> segmentsFromInLineageEntry = new HashSet<>(lineageEntry.getSegmentsFrom()); - if (!segmentsFromInLineageEntry.isEmpty()) { - for (String segment : segmentsFrom) { - Preconditions.checkState(!segmentsFromInLineageEntry.contains(segment), - "Segment: %s from 'segmentsFrom' exists in table: %s, entry id: %s as 'segmentsFrom'" - + " (replacing a replaced segment)", segment, tableNameWithType, entryId); + } else { + // Check that any segment from 'segmentsFrom' does not appear twice. + if (!segmentsFrom.isEmpty()) { + Set<String> segmentsFromInLineageEntry = new HashSet<>(lineageEntry.getSegmentsFrom()); + if (!segmentsFromInLineageEntry.isEmpty()) { + for (String segment : segmentsFrom) { + Preconditions.checkState(!segmentsFromInLineageEntry.contains(segment), + "Segment: %s from 'segmentsFrom' exists in table: %s, entry id: %s as 'segmentsFrom'" + + " (replacing a replaced segment)", segment, tableNameWithType, entryId); + } } } - } - if (!segmentsTo.isEmpty()) { - Set<String> segmentsToInLineageEntry = new HashSet<>(lineageEntry.getSegmentsTo()); - if (!segmentsToInLineageEntry.isEmpty()) { - for (String segment : segmentsTo) { - Preconditions.checkState(!segmentsToInLineageEntry.contains(segment), - "Segment: %s from 'segmentsTo' exists in table: %s, entry id: %s as 'segmentTo'" - + " (name conflict)", segment, tableNameWithType, entryId); + if (!segmentsTo.isEmpty()) { + Set<String> segmentsToInLineageEntry = new HashSet<>(lineageEntry.getSegmentsTo()); + if (!segmentsToInLineageEntry.isEmpty()) { + for (String segment : segmentsTo) { + Preconditions.checkState(!segmentsToInLineageEntry.contains(segment), + "Segment: %s from 'segmentsTo' exists in table: %s, entry id: %s as 'segmentTo'" + + " (name conflict)", segment, tableNameWithType, entryId); + } } } } } - } - // Update lineage entry - segmentLineage.addLineageEntry(segmentLineageEntryId, - new LineageEntry(segmentsFrom, segmentsTo, LineageEntryState.IN_PROGRESS, System.currentTimeMillis())); - - _lineageManager.updateLineageForStartReplaceSegments(tableConfig, segmentLineageEntryId, customMap, - segmentLineage); - // Write back to the lineage entry to the property store - if (SegmentLineageAccessHelper.writeSegmentLineage(_propertyStore, segmentLineage, expectedVersion)) { - // Trigger the proactive segment clean up if needed. Once the lineage is updated in the property store, it - // is safe to physically delete segments. - if (!segmentsToCleanUp.isEmpty()) { - LOGGER.info("Cleaning up the segments while startReplaceSegments: {}", segmentsToCleanUp); - deleteSegments(tableNameWithType, segmentsToCleanUp); + // Update lineage entry + segmentLineage.addLineageEntry(segmentLineageEntryId, + new LineageEntry(segmentsFrom, segmentsTo, LineageEntryState.IN_PROGRESS, System.currentTimeMillis())); + + _lineageManager.updateLineageForStartReplaceSegments(tableConfig, segmentLineageEntryId, customMap, + segmentLineage); + // Write back to the lineage entry to the property store + if (SegmentLineageAccessHelper.writeSegmentLineage(_propertyStore, segmentLineage, expectedVersion)) { + return true; + } else { + LOGGER.warn("Failed to write segment lineage for table: {}", tableNameWithType); + return false; } - return true; - } else { - LOGGER.warn("Failed to write segment lineage for table: {}", tableNameWithType); - return false; - } - }); - } catch (Exception e) { - String errorMsg = String.format("Failed to update the segment lineage during startReplaceSegments. " - + "(tableName = %s, segmentsFrom = %s, segmentsTo = %s)", tableNameWithType, segmentsFrom, segmentsTo); - LOGGER.error(errorMsg, e); - throw new RuntimeException(errorMsg, e); + }); + } catch (Exception e) { + String errorMsg = String.format("Failed to update the segment lineage during startReplaceSegments. " + + "(tableName = %s, segmentsFrom = %s, segmentsTo = %s)", tableNameWithType, segmentsFrom, segmentsTo); + LOGGER.error(errorMsg, e); + throw new RuntimeException(errorMsg, e); + } + } + + // Trigger the proactive segment clean up if needed. Once the lineage is updated in the property store, it + // is safe to physically delete segments. + if (!segmentsToCleanUp.isEmpty()) { + LOGGER.info("Cleaning up the segments while startReplaceSegments: {}", segmentsToCleanUp); + deleteSegments(tableNameWithType, segmentsToCleanUp); } // Only successful attempt can reach here - LOGGER.info("startReplaceSegments is successfully processed. (tableNameWithType = {}, segmentsFrom = {}, " - + "segmentsTo = {}, segmentLineageEntryId = {})", tableNameWithType, segmentsFrom, segmentsTo, - segmentLineageEntryId); + LOGGER.info("startReplaceSegments is successfully processed in {} ms. (tableNameWithType = {}, segmentsFrom = {}, " Review Comment: Let's also log the number of attempts or even each attempt time? ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java: ########## @@ -3475,167 +3486,168 @@ public String startReplaceSegments(String tableNameWithType, List<String> segmen Preconditions.checkState(!segmentsForTable.contains(segment), "Segment: %s from 'segmentsTo' exists in table: %s", segment, tableNameWithType); } + List<String> segmentsToCleanUp = new ArrayList<>(); + synchronized (getLineageUpdaterLock(tableNameWithType)) { + try { + DEFAULT_RETRY_POLICY.attempt(() -> { + // Fetch table config + TableConfig tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType); + Preconditions.checkState(tableConfig != null, "Failed to find table config for table: %s", tableNameWithType); + + // Fetch the segment lineage metadata + ZNRecord segmentLineageZNRecord = + SegmentLineageAccessHelper.getSegmentLineageZNRecord(_propertyStore, tableNameWithType); + SegmentLineage segmentLineage; + int expectedVersion; + if (segmentLineageZNRecord == null) { + segmentLineage = new SegmentLineage(tableNameWithType); + expectedVersion = -1; + } else { + segmentLineage = SegmentLineage.fromZNRecord(segmentLineageZNRecord); + expectedVersion = segmentLineageZNRecord.getVersion(); + } + // Check that the segment lineage entry id doesn't exist in the segment lineage + Preconditions.checkState(segmentLineage.getLineageEntry(segmentLineageEntryId) == null, + "Entry id: %s already exists in the segment lineage for table: %s", segmentLineageEntryId, + tableNameWithType); + + Iterator<Map.Entry<String, LineageEntry>> entryIterator = + segmentLineage.getLineageEntries().entrySet().iterator(); + while (entryIterator.hasNext()) { + Map.Entry<String, LineageEntry> entry = entryIterator.next(); + String entryId = entry.getKey(); + LineageEntry lineageEntry = entry.getValue(); + + // If the lineage entry is in 'REVERTED' state, no need to go through the validation because we can regard + // the entry as not existing. + if (lineageEntry.getState() == LineageEntryState.REVERTED) { + // When 'forceCleanup' is enabled, proactively clean up 'segmentsTo' since it's safe to do so. + if (forceCleanup) { + segmentsToCleanUp.addAll(lineageEntry.getSegmentsTo()); + } + continue; + } - try { - DEFAULT_RETRY_POLICY.attempt(() -> { - // Fetch table config - TableConfig tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType); - Preconditions.checkState(tableConfig != null, "Failed to find table config for table: %s", tableNameWithType); - - // Fetch the segment lineage metadata - ZNRecord segmentLineageZNRecord = - SegmentLineageAccessHelper.getSegmentLineageZNRecord(_propertyStore, tableNameWithType); - SegmentLineage segmentLineage; - int expectedVersion; - if (segmentLineageZNRecord == null) { - segmentLineage = new SegmentLineage(tableNameWithType); - expectedVersion = -1; - } else { - segmentLineage = SegmentLineage.fromZNRecord(segmentLineageZNRecord); - expectedVersion = segmentLineageZNRecord.getVersion(); - } - // Check that the segment lineage entry id doesn't exist in the segment lineage - Preconditions.checkState(segmentLineage.getLineageEntry(segmentLineageEntryId) == null, - "Entry id: %s already exists in the segment lineage for table: %s", segmentLineageEntryId, - tableNameWithType); + // By here, the lineage entry is either 'IN_PROGRESS' or 'COMPLETED'. - List<String> segmentsToCleanUp = new ArrayList<>(); - Iterator<Map.Entry<String, LineageEntry>> entryIterator = - segmentLineage.getLineageEntries().entrySet().iterator(); - while (entryIterator.hasNext()) { - Map.Entry<String, LineageEntry> entry = entryIterator.next(); - String entryId = entry.getKey(); - LineageEntry lineageEntry = entry.getValue(); - - // If the lineage entry is in 'REVERTED' state, no need to go through the validation because we can regard - // the entry as not existing. - if (lineageEntry.getState() == LineageEntryState.REVERTED) { - // When 'forceCleanup' is enabled, proactively clean up 'segmentsTo' since it's safe to do so. + // When 'forceCleanup' is enabled, we need to proactively clean up at the following cases: + // 1. Revert the lineage entry when we find the lineage entry with overlapped 'segmentsFrom' or 'segmentsTo' + // values. This is used to un-block the segment replacement protocol if the previous attempt failed in + // the middle. + // 2. Proactively delete the oldest data snapshot to make sure that we only keep at most 2 data snapshots + // at any time in case of REFRESH use case. if (forceCleanup) { - segmentsToCleanUp.addAll(lineageEntry.getSegmentsTo()); - } - continue; - } + if (lineageEntry.getState() == LineageEntryState.IN_PROGRESS && ( + !Collections.disjoint(segmentsFrom, lineageEntry.getSegmentsFrom()) || !Collections.disjoint( + segmentsTo, lineageEntry.getSegmentsTo()))) { + LOGGER.info( + "Detected the incomplete lineage entry with overlapped 'segmentsFrom' or 'segmentsTo'. Deleting or " + + "reverting the lineage entry to unblock the new segment protocol. tableNameWithType={}, " + + "entryId={}, segmentsFrom={}, segmentsTo={}", tableNameWithType, entryId, + lineageEntry.getSegmentsFrom(), lineageEntry.getSegmentsTo()); + + // Delete the 'IN_PROGRESS' entry or update it to 'REVERTED' + // Delete or update segmentsTo of the entry to revert to handle the case of rerunning the protocol: + // Initial state: + // Entry1: { segmentsFrom: [s1, s2], segmentsTo: [s3, s4], status: IN_PROGRESS} + // 1. Rerunning the protocol with s4 and s5, s4 should not be deleted to avoid race conditions of + // concurrent data pushes and deletions: + // Entry1: { segmentsFrom: [s1, s2], segmentsTo: [s3], status: REVERTED} + // Entry2: { segmentsFrom: [s1, s2], segmentsTo: [s4, s5], status: IN_PROGRESS} + // 2. Rerunning the protocol with s3 and s4, we can simply remove the 'IN_PROGRESS' entry: + // Entry2: { segmentsFrom: [s1, s2], segmentsTo: [s3, s4], status: IN_PROGRESS} + List<String> segmentsToForEntryToRevert = new ArrayList<>(lineageEntry.getSegmentsTo()); + segmentsToForEntryToRevert.removeAll(segmentsTo); + if (segmentsToForEntryToRevert.isEmpty()) { + // Delete 'IN_PROGRESS' entry if the segmentsTo is empty + entryIterator.remove(); + } else { + // Update the lineage entry to 'REVERTED' + entry.setValue(new LineageEntry(lineageEntry.getSegmentsFrom(), segmentsToForEntryToRevert, + LineageEntryState.REVERTED, System.currentTimeMillis())); + } - // By here, the lineage entry is either 'IN_PROGRESS' or 'COMPLETED'. - - // When 'forceCleanup' is enabled, we need to proactively clean up at the following cases: - // 1. Revert the lineage entry when we find the lineage entry with overlapped 'segmentsFrom' or 'segmentsTo' - // values. This is used to un-block the segment replacement protocol if the previous attempt failed in the - // middle. - // 2. Proactively delete the oldest data snapshot to make sure that we only keep at most 2 data snapshots - // at any time in case of REFRESH use case. - if (forceCleanup) { - if (lineageEntry.getState() == LineageEntryState.IN_PROGRESS && ( - !Collections.disjoint(segmentsFrom, lineageEntry.getSegmentsFrom()) || !Collections.disjoint(segmentsTo, - lineageEntry.getSegmentsTo()))) { - LOGGER.info( - "Detected the incomplete lineage entry with overlapped 'segmentsFrom' or 'segmentsTo'. Deleting or " - + "reverting the lineage entry to unblock the new segment protocol. tableNameWithType={}, " - + "entryId={}, segmentsFrom={}, segmentsTo={}", tableNameWithType, entryId, - lineageEntry.getSegmentsFrom(), lineageEntry.getSegmentsTo()); - - // Delete the 'IN_PROGRESS' entry or update it to 'REVERTED' - // Delete or update segmentsTo of the entry to revert to handle the case of rerunning the protocol: - // Initial state: - // Entry1: { segmentsFrom: [s1, s2], segmentsTo: [s3, s4], status: IN_PROGRESS} - // 1. Rerunning the protocol with s4 and s5, s4 should not be deleted to avoid race conditions of - // concurrent data pushes and deletions: - // Entry1: { segmentsFrom: [s1, s2], segmentsTo: [s3], status: REVERTED} - // Entry2: { segmentsFrom: [s1, s2], segmentsTo: [s4, s5], status: IN_PROGRESS} - // 2. Rerunning the protocol with s3 and s4, we can simply remove the 'IN_PROGRESS' entry: - // Entry2: { segmentsFrom: [s1, s2], segmentsTo: [s3, s4], status: IN_PROGRESS} - List<String> segmentsToForEntryToRevert = new ArrayList<>(lineageEntry.getSegmentsTo()); - segmentsToForEntryToRevert.removeAll(segmentsTo); - if (segmentsToForEntryToRevert.isEmpty()) { - // Delete 'IN_PROGRESS' entry if the segmentsTo is empty - entryIterator.remove(); - } else { - // Update the lineage entry to 'REVERTED' - entry.setValue(new LineageEntry(lineageEntry.getSegmentsFrom(), segmentsToForEntryToRevert, - LineageEntryState.REVERTED, System.currentTimeMillis())); + // Add segments for proactive clean-up. + segmentsToCleanUp.addAll(segmentsToForEntryToRevert); + } else if (lineageEntry.getState() == LineageEntryState.COMPLETED && "REFRESH".equalsIgnoreCase( + IngestionConfigUtils.getBatchSegmentIngestionType(tableConfig)) && CollectionUtils.isEqualCollection( + segmentsFrom, lineageEntry.getSegmentsTo())) { + // This part of code assumes that we only allow at most 2 data snapshots at a time by proactively + // deleting the older snapshots (for REFRESH tables). + // + // e.g. (Seg_0, Seg_1, Seg_2) -> (Seg_3, Seg_4, Seg_5) // previous lineage + // (Seg_3, Seg_4, Seg_5) -> (Seg_6, Seg_7, Seg_8) // current lineage to be updated + // -> proactively delete (Seg_0, Seg_1, Seg_2) since we want to keep 2 data snapshots + // (Seg_3, Seg_4, Seg_5), (Seg_6, Seg_7, Seg_8) only to avoid the disk space waste. + // + // TODO: make the number of allowed snapshots configurable to allow users to keep at most N snapshots + // of data. We need to traverse the lineage by N steps instead of 2 steps. We can build the + // reverse hash map (segmentsTo -> segmentsFrom) and traverse up to N times before deleting. + LOGGER.info( + "Proactively deleting the replaced segments for REFRESH table to avoid the excessive disk waste. " + + "tableNameWithType={}, segmentsToCleanUp={}", tableNameWithType, + lineageEntry.getSegmentsFrom()); + segmentsToCleanUp.addAll(lineageEntry.getSegmentsFrom()); } - - // Add segments for proactive clean-up. - segmentsToCleanUp.addAll(segmentsToForEntryToRevert); - } else if (lineageEntry.getState() == LineageEntryState.COMPLETED && "REFRESH".equalsIgnoreCase( - IngestionConfigUtils.getBatchSegmentIngestionType(tableConfig)) && CollectionUtils.isEqualCollection( - segmentsFrom, lineageEntry.getSegmentsTo())) { - // This part of code assumes that we only allow at most 2 data snapshots at a time by proactively - // deleting the older snapshots (for REFRESH tables). - // - // e.g. (Seg_0, Seg_1, Seg_2) -> (Seg_3, Seg_4, Seg_5) // previous lineage - // (Seg_3, Seg_4, Seg_5) -> (Seg_6, Seg_7, Seg_8) // current lineage to be updated - // -> proactively delete (Seg_0, Seg_1, Seg_2) since we want to keep 2 data snapshots - // (Seg_3, Seg_4, Seg_5), (Seg_6, Seg_7, Seg_8) only to avoid the disk space waste. - // - // TODO: make the number of allowed snapshots configurable to allow users to keep at most N snapshots - // of data. We need to traverse the lineage by N steps instead of 2 steps. We can build the reverse - // hash map (segmentsTo -> segmentsFrom) and traverse up to N times before deleting. - // - LOGGER.info( - "Proactively deleting the replaced segments for REFRESH table to avoid the excessive disk waste. " - + "tableNameWithType={}, segmentsToCleanUp={}", tableNameWithType, - lineageEntry.getSegmentsFrom()); - segmentsToCleanUp.addAll(lineageEntry.getSegmentsFrom()); - } - } else { - // Check that any segment from 'segmentsFrom' does not appear twice. - if (!segmentsFrom.isEmpty()) { - Set<String> segmentsFromInLineageEntry = new HashSet<>(lineageEntry.getSegmentsFrom()); - if (!segmentsFromInLineageEntry.isEmpty()) { - for (String segment : segmentsFrom) { - Preconditions.checkState(!segmentsFromInLineageEntry.contains(segment), - "Segment: %s from 'segmentsFrom' exists in table: %s, entry id: %s as 'segmentsFrom'" - + " (replacing a replaced segment)", segment, tableNameWithType, entryId); + } else { + // Check that any segment from 'segmentsFrom' does not appear twice. + if (!segmentsFrom.isEmpty()) { + Set<String> segmentsFromInLineageEntry = new HashSet<>(lineageEntry.getSegmentsFrom()); + if (!segmentsFromInLineageEntry.isEmpty()) { + for (String segment : segmentsFrom) { + Preconditions.checkState(!segmentsFromInLineageEntry.contains(segment), + "Segment: %s from 'segmentsFrom' exists in table: %s, entry id: %s as 'segmentsFrom'" + + " (replacing a replaced segment)", segment, tableNameWithType, entryId); + } } } - } - if (!segmentsTo.isEmpty()) { - Set<String> segmentsToInLineageEntry = new HashSet<>(lineageEntry.getSegmentsTo()); - if (!segmentsToInLineageEntry.isEmpty()) { - for (String segment : segmentsTo) { - Preconditions.checkState(!segmentsToInLineageEntry.contains(segment), - "Segment: %s from 'segmentsTo' exists in table: %s, entry id: %s as 'segmentTo'" - + " (name conflict)", segment, tableNameWithType, entryId); + if (!segmentsTo.isEmpty()) { + Set<String> segmentsToInLineageEntry = new HashSet<>(lineageEntry.getSegmentsTo()); + if (!segmentsToInLineageEntry.isEmpty()) { + for (String segment : segmentsTo) { + Preconditions.checkState(!segmentsToInLineageEntry.contains(segment), + "Segment: %s from 'segmentsTo' exists in table: %s, entry id: %s as 'segmentTo'" + + " (name conflict)", segment, tableNameWithType, entryId); + } } } } } - } - // Update lineage entry - segmentLineage.addLineageEntry(segmentLineageEntryId, - new LineageEntry(segmentsFrom, segmentsTo, LineageEntryState.IN_PROGRESS, System.currentTimeMillis())); - - _lineageManager.updateLineageForStartReplaceSegments(tableConfig, segmentLineageEntryId, customMap, - segmentLineage); - // Write back to the lineage entry to the property store - if (SegmentLineageAccessHelper.writeSegmentLineage(_propertyStore, segmentLineage, expectedVersion)) { - // Trigger the proactive segment clean up if needed. Once the lineage is updated in the property store, it - // is safe to physically delete segments. - if (!segmentsToCleanUp.isEmpty()) { - LOGGER.info("Cleaning up the segments while startReplaceSegments: {}", segmentsToCleanUp); - deleteSegments(tableNameWithType, segmentsToCleanUp); + // Update lineage entry + segmentLineage.addLineageEntry(segmentLineageEntryId, + new LineageEntry(segmentsFrom, segmentsTo, LineageEntryState.IN_PROGRESS, System.currentTimeMillis())); + + _lineageManager.updateLineageForStartReplaceSegments(tableConfig, segmentLineageEntryId, customMap, + segmentLineage); + // Write back to the lineage entry to the property store + if (SegmentLineageAccessHelper.writeSegmentLineage(_propertyStore, segmentLineage, expectedVersion)) { Review Comment: Can we log the time for every lineage processing/compute to get a sense of internal lineage reprocessing time? If it takes more time, then we should think of introducing a table level distributed lock. ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java: ########## @@ -3862,49 +3876,50 @@ public void revertReplaceSegments(String tableNameWithType, String segmentLineag * @param lineageUpdateType * @param customMap */ - private boolean writeLineageEntryWithTightLoop(TableConfig tableConfig, String lineageEntryId, + private boolean writeLineageEntryWithLock(TableConfig tableConfig, String lineageEntryId, LineageEntry lineageEntryToUpdate, LineageEntry lineageEntryToMatch, ZkHelixPropertyStore<ZNRecord> propertyStore, LineageUpdateType lineageUpdateType, Map<String, String> customMap) { - for (int i = 0; i < DEFAULT_SEGMENT_LINEAGE_UPDATE_NUM_RETRY; i++) { - // Fetch the segment lineage - ZNRecord segmentLineageToUpdateZNRecord = - SegmentLineageAccessHelper.getSegmentLineageZNRecord(propertyStore, tableConfig.getTableName()); - int expectedVersion = segmentLineageToUpdateZNRecord.getVersion(); - SegmentLineage segmentLineageToUpdate = SegmentLineage.fromZNRecord(segmentLineageToUpdateZNRecord); - LineageEntry currentLineageEntry = segmentLineageToUpdate.getLineageEntry(lineageEntryId); - - // If the lineage entry doesn't match with the previously fetched lineage, we need to fail the request. - if (!currentLineageEntry.equals(lineageEntryToMatch)) { - String errorMsg = String.format( - "Aborting the to update lineage entry since we find that the entry has been modified for table %s, " - + "entry id: %s", tableConfig.getTableName(), lineageEntryId); - LOGGER.error(errorMsg); - throw new RuntimeException(errorMsg); - } + String tableNameWithType = tableConfig.getTableName(); + synchronized (getLineageUpdaterLock(tableNameWithType)) { + // Fetch the segment lineage + ZNRecord segmentLineageToUpdateZNRecord = + SegmentLineageAccessHelper.getSegmentLineageZNRecord(propertyStore, tableConfig.getTableName()); + int expectedVersion = segmentLineageToUpdateZNRecord.getVersion(); + SegmentLineage segmentLineageToUpdate = SegmentLineage.fromZNRecord(segmentLineageToUpdateZNRecord); + LineageEntry currentLineageEntry = segmentLineageToUpdate.getLineageEntry(lineageEntryId); + + // If the lineage entry doesn't match with the previously fetched lineage, we need to fail the request. + if (!currentLineageEntry.equals(lineageEntryToMatch)) { + String errorMsg = String.format( + "Aborting the to update lineage entry since we find that the entry has been modified for table %s, " + + "entry id: %s", tableConfig.getTableName(), lineageEntryId); + LOGGER.error(errorMsg); + throw new RuntimeException(errorMsg); + } - // Update lineage entry - segmentLineageToUpdate.updateLineageEntry(lineageEntryId, lineageEntryToUpdate); - switch (lineageUpdateType) { - case START: - _lineageManager.updateLineageForStartReplaceSegments(tableConfig, lineageEntryId, customMap, - segmentLineageToUpdate); - break; - case END: - _lineageManager.updateLineageForEndReplaceSegments(tableConfig, lineageEntryId, customMap, - segmentLineageToUpdate); - break; - case REVERT: - _lineageManager.updateLineageForRevertReplaceSegments(tableConfig, lineageEntryId, customMap, - segmentLineageToUpdate); - break; - default: - } + // Update lineage entry + segmentLineageToUpdate.updateLineageEntry(lineageEntryId, lineageEntryToUpdate); + switch (lineageUpdateType) { + case START: Review Comment: I think this `case START` won't be hit? ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java: ########## @@ -191,7 +191,8 @@ public class PinotHelixResourceManager { private static final RetryPolicy DEFAULT_RETRY_POLICY = RetryPolicies.exponentialBackoffRetryPolicy(5, 1000L, 2.0f); private static final int DEFAULT_SEGMENT_LINEAGE_UPDATE_NUM_RETRY = 10; Review Comment: DEFAULT_SEGMENT_LINEAGE_UPDATE_NUM_RETRY is not used anymore. ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java: ########## @@ -3862,49 +3876,50 @@ public void revertReplaceSegments(String tableNameWithType, String segmentLineag * @param lineageUpdateType * @param customMap */ - private boolean writeLineageEntryWithTightLoop(TableConfig tableConfig, String lineageEntryId, + private boolean writeLineageEntryWithLock(TableConfig tableConfig, String lineageEntryId, LineageEntry lineageEntryToUpdate, LineageEntry lineageEntryToMatch, ZkHelixPropertyStore<ZNRecord> propertyStore, LineageUpdateType lineageUpdateType, Map<String, String> customMap) { - for (int i = 0; i < DEFAULT_SEGMENT_LINEAGE_UPDATE_NUM_RETRY; i++) { - // Fetch the segment lineage - ZNRecord segmentLineageToUpdateZNRecord = - SegmentLineageAccessHelper.getSegmentLineageZNRecord(propertyStore, tableConfig.getTableName()); - int expectedVersion = segmentLineageToUpdateZNRecord.getVersion(); - SegmentLineage segmentLineageToUpdate = SegmentLineage.fromZNRecord(segmentLineageToUpdateZNRecord); - LineageEntry currentLineageEntry = segmentLineageToUpdate.getLineageEntry(lineageEntryId); - - // If the lineage entry doesn't match with the previously fetched lineage, we need to fail the request. - if (!currentLineageEntry.equals(lineageEntryToMatch)) { - String errorMsg = String.format( - "Aborting the to update lineage entry since we find that the entry has been modified for table %s, " - + "entry id: %s", tableConfig.getTableName(), lineageEntryId); - LOGGER.error(errorMsg); - throw new RuntimeException(errorMsg); - } + String tableNameWithType = tableConfig.getTableName(); Review Comment: Considering the distributed update from other controllers, I feel it's still better to put a smaller number of retries here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org