J-HowHuang commented on code in PR #16341: URL: https://github.com/apache/pinot/pull/16341#discussion_r2211202414
########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ########## @@ -387,6 +391,24 @@ private RebalanceResult doRebalance(TableConfig tableConfig, RebalanceConfig reb tierToInstancePartitionsMap, targetAssignment, preChecksResult, summaryResult); } + if (downtime && !StringUtils.isEmpty(tableConfig.getValidationConfig().getPeerSegmentDownloadScheme())) { Review Comment: I think in `SegmentCommitter` it checks if this is `null` to decide to compromise to a peer url. We should check the same here otherwise we fail to catch the case when `peerSegmentDownloadScheme=""` (though I'm not sure if empty string is allowed) https://github.com/apache/pinot/blob/d953d7cffcd2626cbe78648c7e599544e22bab96/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SplitSegmentCommitter.java#L99 ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ########## @@ -1813,9 +1865,63 @@ public int fetch(String segmentName) { } } + @VisibleForTesting + @FunctionalInterface + interface DataLossRiskAssessor { + boolean hasDataLossRisk(String segmentName); + } + + private static class DataLossRiskAssessorImpl implements DataLossRiskAssessor { + private final String _tableNameWithType; + private final HelixManager _helixManager; + private final boolean _isPeerDownloadEnabled; + private final boolean _isUpsertOrDedupTable; + private final boolean _isPauselessEnabled; + + private DataLossRiskAssessorImpl(String tableNameWithType, TableConfig tableConfig, HelixManager helixManager) { + _tableNameWithType = tableNameWithType; + _helixManager = helixManager; + _isPeerDownloadEnabled = !StringUtils.isEmpty(tableConfig.getValidationConfig().getPeerSegmentDownloadScheme()); + _isUpsertOrDedupTable = tableConfig.isUpsertEnabled() || tableConfig.isDedupEnabled(); + _isPauselessEnabled = PauselessConsumptionUtils.isPauselessEnabled(tableConfig); + } + + @Override + public boolean hasDataLossRisk(String segmentName) { + // If peer-download is disabled, no data loss risk exists + if (!_isPeerDownloadEnabled) { + return false; + } + + SegmentZKMetadata segmentZKMetadata = ZKMetadataProvider + .getSegmentZKMetadata(_helixManager.getHelixPropertyStore(), _tableNameWithType, segmentName); + if (segmentZKMetadata == null) { + return false; + } + + // If the segment state is COMPLETED and the peer download URL is empty, there is a data loss risk + if (!segmentZKMetadata.getStatus().isCompleted()) { Review Comment: Agree with @somandal , unless the table is upsert/dedup, the worst of moving a `COMMITTING` segment results in segments with ERROR states if the new host of the segment couldn't wait until it's done committing during its `OFFLINE -> ONLINE` transition: https://github.com/apache/pinot/blob/d488bd1e0881ca93f4ed50d7be63e8424590050f/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java#L574-L587 ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ########## @@ -1813,9 +1889,70 @@ public int fetch(String segmentName) { } } + @VisibleForTesting + @FunctionalInterface + interface DataLossRiskAssessor { + boolean hasDataLossRisk(String segmentName); + } + + private static class DataLossRiskAssessorImpl implements DataLossRiskAssessor { + private final String _tableNameWithType; + private final TableConfig _tableConfig; + private final int _minAvailableReplicas; + private final HelixManager _helixManager; + private final PinotLLCRealtimeSegmentManager _pinotLLCRealtimeSegmentManager; + private final boolean _isPeerDownloadEnabled; + private final boolean _isPauselessEnabled; + + private DataLossRiskAssessorImpl(String tableNameWithType, TableConfig tableConfig, int minAvailableReplicas, + HelixManager helixManager, PinotLLCRealtimeSegmentManager pinotLLCRealtimeSegmentManager) { + _tableNameWithType = tableNameWithType; + _tableConfig = tableConfig; + _minAvailableReplicas = minAvailableReplicas; + _helixManager = helixManager; + _pinotLLCRealtimeSegmentManager = pinotLLCRealtimeSegmentManager; + _isPeerDownloadEnabled = !StringUtils.isEmpty(tableConfig.getValidationConfig().getPeerSegmentDownloadScheme()); + _isPauselessEnabled = PauselessConsumptionUtils.isPauselessEnabled(tableConfig); + } + + @Override + public boolean hasDataLossRisk(String segmentName) { + // If peer-download is disabled, or minAvailableReplicas > 0 no data loss risk exists + if (!_isPeerDownloadEnabled || _minAvailableReplicas > 0) { + return false; + } Review Comment: nit: since you have had `DataLossRiskAssessor` interface, can we have a `PeerDownloadTableDataLossRiskAssessor` and a `NoopDataLossRiskAssessor` for example, which would be easier to maintain and read if we're adding more assessment like this in the future? ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ########## @@ -411,6 +433,30 @@ private RebalanceResult doRebalance(TableConfig tableConfig, RebalanceConfig reb tierToInstancePartitionsMap, rebalanceConfig); } } + + // If peer-download is enabled, verify that for all segments with changes in assignment, it is safe to rebalance + // Create the DataLossRiskAssessor which is used to check for data loss scenarios if peer-download is enabled + // for a table + if (!StringUtils.isEmpty(tableConfig.getValidationConfig().getPeerSegmentDownloadScheme())) { Review Comment: Check `getPeerSegmentDownloadScheme() != null` instead, as the previous comment ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ########## @@ -387,6 +391,24 @@ private RebalanceResult doRebalance(TableConfig tableConfig, RebalanceConfig reb tierToInstancePartitionsMap, targetAssignment, preChecksResult, summaryResult); } + if (downtime && !StringUtils.isEmpty(tableConfig.getValidationConfig().getPeerSegmentDownloadScheme())) { + if (!forceDowntime) { + // Don't allow downtime rebalance if peer-download is enabled as it can result in data loss + // The best way to rebalance peer-download enabled tables is to: + // - Ensure that all segments have their deep-store copy available + // - Pause ingestion to prevent the creation of new segments during rebalance + // - set forceDowntime=true and re-try running the rebalance + String errorMsg = "Peer-download enabled tables cannot undergo downtime rebalance due to the potential for " + + "data loss, validate all segments exist in deep store and pause ingestion prior to setting " + + "forceDowntime=true to override the downtime flag"; + tableRebalanceLogger.error(errorMsg); + return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED, errorMsg, instancePartitionsMap, Review Comment: Should add an `onReturnFailure` call here? ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ########## @@ -489,6 +535,27 @@ private RebalanceResult doRebalance(TableConfig tableConfig, RebalanceConfig reb minAvailableReplicas = numCurrentAssignmentReplicas; } + // Don't allow rebalance if peer-download is enabled but minAvailableReplicas = 0 (which is similar to downtime + // rebalance where we can drop to 0 replicas during rebalance) + if (minAvailableReplicas == 0 + && !StringUtils.isEmpty(tableConfig.getValidationConfig().getPeerSegmentDownloadScheme())) { + if (!forceDowntime) { + // Don't allow minAvailableReplicas=0 rebalance if peer-download is enabled, as it can result in data loss. + // The best way to rebalance peer-download enabled tables is to: + // - Ensure that all segments have their deep-store copy available + // - Pause ingestion to prevent the creation of new segments during rebalance + // - set forceDowntime=true and re-try running the rebalance + String errorMsg = "Peer-download enabled tables with cannot set minAvailableReplicas=0 for rebalance due to " + + "the potential for data loss, validate all segments exist in deep store and pause ingestion prior to " + + "setting forceDowntime=true to override the downtime flag"; + tableRebalanceLogger.error(errorMsg); + return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED, errorMsg, instancePartitionsMap, Review Comment: Should add an `onReturnFailure` call here too ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ########## @@ -1813,9 +1889,70 @@ public int fetch(String segmentName) { } } + @VisibleForTesting + @FunctionalInterface + interface DataLossRiskAssessor { + boolean hasDataLossRisk(String segmentName); + } + + private static class DataLossRiskAssessorImpl implements DataLossRiskAssessor { + private final String _tableNameWithType; + private final TableConfig _tableConfig; + private final int _minAvailableReplicas; + private final HelixManager _helixManager; + private final PinotLLCRealtimeSegmentManager _pinotLLCRealtimeSegmentManager; + private final boolean _isPeerDownloadEnabled; + private final boolean _isPauselessEnabled; + + private DataLossRiskAssessorImpl(String tableNameWithType, TableConfig tableConfig, int minAvailableReplicas, + HelixManager helixManager, PinotLLCRealtimeSegmentManager pinotLLCRealtimeSegmentManager) { + _tableNameWithType = tableNameWithType; + _tableConfig = tableConfig; + _minAvailableReplicas = minAvailableReplicas; + _helixManager = helixManager; + _pinotLLCRealtimeSegmentManager = pinotLLCRealtimeSegmentManager; + _isPeerDownloadEnabled = !StringUtils.isEmpty(tableConfig.getValidationConfig().getPeerSegmentDownloadScheme()); + _isPauselessEnabled = PauselessConsumptionUtils.isPauselessEnabled(tableConfig); + } + + @Override + public boolean hasDataLossRisk(String segmentName) { + // If peer-download is disabled, or minAvailableReplicas > 0 no data loss risk exists + if (!_isPeerDownloadEnabled || _minAvailableReplicas > 0) { + return false; + } Review Comment: Also I think we'll have to exclude the case when `numReplica > 1` and `minAvailableReplicas=-1`, there's no data loss concerns in this case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org