J-HowHuang commented on code in PR #16341:
URL: https://github.com/apache/pinot/pull/16341#discussion_r2211202414


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -387,6 +391,24 @@ private RebalanceResult doRebalance(TableConfig 
tableConfig, RebalanceConfig reb
           tierToInstancePartitionsMap, targetAssignment, preChecksResult, 
summaryResult);
     }
 
+    if (downtime && 
!StringUtils.isEmpty(tableConfig.getValidationConfig().getPeerSegmentDownloadScheme()))
 {

Review Comment:
   I think in `SegmentCommitter` it checks if this is `null` to decide to 
compromise to a peer url. We should check the same here otherwise we fail to 
catch the case when `peerSegmentDownloadScheme=""`  (though I'm not sure if 
empty string is allowed) 
   
   
https://github.com/apache/pinot/blob/d953d7cffcd2626cbe78648c7e599544e22bab96/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SplitSegmentCommitter.java#L99



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -1813,9 +1865,63 @@ public int fetch(String segmentName) {
     }
   }
 
+  @VisibleForTesting
+  @FunctionalInterface
+  interface DataLossRiskAssessor {
+    boolean hasDataLossRisk(String segmentName);
+  }
+
+  private static class DataLossRiskAssessorImpl implements 
DataLossRiskAssessor {
+    private final String _tableNameWithType;
+    private final HelixManager _helixManager;
+    private final boolean _isPeerDownloadEnabled;
+    private final boolean _isUpsertOrDedupTable;
+    private final boolean _isPauselessEnabled;
+
+    private DataLossRiskAssessorImpl(String tableNameWithType, TableConfig 
tableConfig, HelixManager helixManager) {
+      _tableNameWithType = tableNameWithType;
+      _helixManager = helixManager;
+      _isPeerDownloadEnabled = 
!StringUtils.isEmpty(tableConfig.getValidationConfig().getPeerSegmentDownloadScheme());
+      _isUpsertOrDedupTable = tableConfig.isUpsertEnabled() || 
tableConfig.isDedupEnabled();
+      _isPauselessEnabled = 
PauselessConsumptionUtils.isPauselessEnabled(tableConfig);
+    }
+
+    @Override
+    public boolean hasDataLossRisk(String segmentName) {
+      // If peer-download is disabled, no data loss risk exists
+      if (!_isPeerDownloadEnabled) {
+        return false;
+      }
+
+      SegmentZKMetadata segmentZKMetadata = ZKMetadataProvider
+          .getSegmentZKMetadata(_helixManager.getHelixPropertyStore(), 
_tableNameWithType, segmentName);
+      if (segmentZKMetadata == null) {
+        return false;
+      }
+
+      // If the segment state is COMPLETED and the peer download URL is empty, 
there is a data loss risk
+      if (!segmentZKMetadata.getStatus().isCompleted()) {

Review Comment:
   Agree with @somandal , unless the table is upsert/dedup, the worst of moving 
a `COMMITTING` segment results in segments with ERROR states if the new host of 
the segment couldn't wait until it's done committing during its `OFFLINE -> 
ONLINE` transition:
   
   
https://github.com/apache/pinot/blob/d488bd1e0881ca93f4ed50d7be63e8424590050f/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java#L574-L587



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -1813,9 +1889,70 @@ public int fetch(String segmentName) {
     }
   }
 
+  @VisibleForTesting
+  @FunctionalInterface
+  interface DataLossRiskAssessor {
+    boolean hasDataLossRisk(String segmentName);
+  }
+
+  private static class DataLossRiskAssessorImpl implements 
DataLossRiskAssessor {
+    private final String _tableNameWithType;
+    private final TableConfig _tableConfig;
+    private final int _minAvailableReplicas;
+    private final HelixManager _helixManager;
+    private final PinotLLCRealtimeSegmentManager 
_pinotLLCRealtimeSegmentManager;
+    private final boolean _isPeerDownloadEnabled;
+    private final boolean _isPauselessEnabled;
+
+    private DataLossRiskAssessorImpl(String tableNameWithType, TableConfig 
tableConfig, int minAvailableReplicas,
+        HelixManager helixManager, PinotLLCRealtimeSegmentManager 
pinotLLCRealtimeSegmentManager) {
+      _tableNameWithType = tableNameWithType;
+      _tableConfig = tableConfig;
+      _minAvailableReplicas = minAvailableReplicas;
+      _helixManager = helixManager;
+      _pinotLLCRealtimeSegmentManager = pinotLLCRealtimeSegmentManager;
+      _isPeerDownloadEnabled = 
!StringUtils.isEmpty(tableConfig.getValidationConfig().getPeerSegmentDownloadScheme());
+      _isPauselessEnabled = 
PauselessConsumptionUtils.isPauselessEnabled(tableConfig);
+    }
+
+    @Override
+    public boolean hasDataLossRisk(String segmentName) {
+      // If peer-download is disabled, or minAvailableReplicas > 0 no data 
loss risk exists
+      if (!_isPeerDownloadEnabled || _minAvailableReplicas > 0) {
+        return false;
+      }

Review Comment:
   nit: since you have had `DataLossRiskAssessor` interface, can we have a 
`PeerDownloadTableDataLossRiskAssessor` and a `NoopDataLossRiskAssessor` for 
example, which would be easier to maintain and read if we're adding more 
assessment like this in the future?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -411,6 +433,30 @@ private RebalanceResult doRebalance(TableConfig 
tableConfig, RebalanceConfig reb
               tierToInstancePartitionsMap, rebalanceConfig);
         }
       }
+
+      // If peer-download is enabled, verify that for all segments with 
changes in assignment, it is safe to rebalance
+      // Create the DataLossRiskAssessor which is used to check for data loss 
scenarios if peer-download is enabled
+      // for a table
+      if 
(!StringUtils.isEmpty(tableConfig.getValidationConfig().getPeerSegmentDownloadScheme()))
 {

Review Comment:
   Check `getPeerSegmentDownloadScheme() != null` instead, as the previous 
comment



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -387,6 +391,24 @@ private RebalanceResult doRebalance(TableConfig 
tableConfig, RebalanceConfig reb
           tierToInstancePartitionsMap, targetAssignment, preChecksResult, 
summaryResult);
     }
 
+    if (downtime && 
!StringUtils.isEmpty(tableConfig.getValidationConfig().getPeerSegmentDownloadScheme()))
 {
+      if (!forceDowntime) {
+        // Don't allow downtime rebalance if peer-download is enabled as it 
can result in data loss
+        // The best way to rebalance peer-download enabled tables is to:
+        // - Ensure that all segments have their deep-store copy available
+        // - Pause ingestion to prevent the creation of new segments during 
rebalance
+        // - set forceDowntime=true and re-try running the rebalance
+        String errorMsg = "Peer-download enabled tables cannot undergo 
downtime rebalance due to the potential for "
+            + "data loss, validate all segments exist in deep store and pause 
ingestion prior to setting "
+            + "forceDowntime=true to override the downtime flag";
+        tableRebalanceLogger.error(errorMsg);
+        return new RebalanceResult(rebalanceJobId, 
RebalanceResult.Status.FAILED, errorMsg, instancePartitionsMap,

Review Comment:
   Should add an `onReturnFailure` call here?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -489,6 +535,27 @@ private RebalanceResult doRebalance(TableConfig 
tableConfig, RebalanceConfig reb
       minAvailableReplicas = numCurrentAssignmentReplicas;
     }
 
+    // Don't allow rebalance if peer-download is enabled but 
minAvailableReplicas = 0 (which is similar to downtime
+    // rebalance where we can drop to 0 replicas during rebalance)
+    if (minAvailableReplicas == 0
+        && 
!StringUtils.isEmpty(tableConfig.getValidationConfig().getPeerSegmentDownloadScheme()))
 {
+      if (!forceDowntime) {
+        // Don't allow minAvailableReplicas=0 rebalance if peer-download is 
enabled, as it can result in data loss.
+        // The best way to rebalance peer-download enabled tables is to:
+        // - Ensure that all segments have their deep-store copy available
+        // - Pause ingestion to prevent the creation of new segments during 
rebalance
+        // - set forceDowntime=true and re-try running the rebalance
+        String errorMsg = "Peer-download enabled tables with cannot set 
minAvailableReplicas=0 for rebalance due to "
+            + "the potential for data loss, validate all segments exist in 
deep store and pause ingestion prior to "
+            + "setting forceDowntime=true to override the downtime flag";
+        tableRebalanceLogger.error(errorMsg);
+        return new RebalanceResult(rebalanceJobId, 
RebalanceResult.Status.FAILED, errorMsg, instancePartitionsMap,

Review Comment:
   Should add an `onReturnFailure` call here too



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -1813,9 +1889,70 @@ public int fetch(String segmentName) {
     }
   }
 
+  @VisibleForTesting
+  @FunctionalInterface
+  interface DataLossRiskAssessor {
+    boolean hasDataLossRisk(String segmentName);
+  }
+
+  private static class DataLossRiskAssessorImpl implements 
DataLossRiskAssessor {
+    private final String _tableNameWithType;
+    private final TableConfig _tableConfig;
+    private final int _minAvailableReplicas;
+    private final HelixManager _helixManager;
+    private final PinotLLCRealtimeSegmentManager 
_pinotLLCRealtimeSegmentManager;
+    private final boolean _isPeerDownloadEnabled;
+    private final boolean _isPauselessEnabled;
+
+    private DataLossRiskAssessorImpl(String tableNameWithType, TableConfig 
tableConfig, int minAvailableReplicas,
+        HelixManager helixManager, PinotLLCRealtimeSegmentManager 
pinotLLCRealtimeSegmentManager) {
+      _tableNameWithType = tableNameWithType;
+      _tableConfig = tableConfig;
+      _minAvailableReplicas = minAvailableReplicas;
+      _helixManager = helixManager;
+      _pinotLLCRealtimeSegmentManager = pinotLLCRealtimeSegmentManager;
+      _isPeerDownloadEnabled = 
!StringUtils.isEmpty(tableConfig.getValidationConfig().getPeerSegmentDownloadScheme());
+      _isPauselessEnabled = 
PauselessConsumptionUtils.isPauselessEnabled(tableConfig);
+    }
+
+    @Override
+    public boolean hasDataLossRisk(String segmentName) {
+      // If peer-download is disabled, or minAvailableReplicas > 0 no data 
loss risk exists
+      if (!_isPeerDownloadEnabled || _minAvailableReplicas > 0) {
+        return false;
+      }

Review Comment:
   Also I think we'll have to exclude the case when `numReplica > 1` and 
`minAvailableReplicas=-1`, there's no data loss concerns in this case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to