deepthi912 commented on code in PR #15933:
URL: https://github.com/apache/pinot/pull/15933#discussion_r2112959646


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -3840,63 +3843,71 @@ public RebalanceResult rebalanceTable(String 
tableNameWithType, TableConfig tabl
     return tableRebalancer.rebalance(tableConfig, rebalanceConfig, 
rebalanceJobId, tierToSegmentsMap);
   }
 
-  /**
-   * Calculate the target tier the segment belongs to and set it in segment ZK 
metadata as goal state, which can be
-   * checked by servers when loading the segment to put it onto the target 
storage tier.
-   */
+  /// Calculates the target tier for the segments within a table, updates the 
segment ZK metadata and persists the
+  /// update to ZK.
   @VisibleForTesting
   Map<String, Set<String>> updateTargetTier(String rebalanceJobId, String 
tableNameWithType, TableConfig tableConfig) {
-    List<TierConfig> tierCfgs = tableConfig.getTierConfigsList();
     List<Tier> sortedTiers =
-        CollectionUtils.isNotEmpty(tierCfgs) ? 
TierConfigUtils.getSortedTiersForStorageType(tierCfgs,
-            TierFactory.PINOT_SERVER_STORAGE_TYPE, _helixZkManager) : 
Collections.emptyList();
+        CollectionUtils.isNotEmpty(tableConfig.getTierConfigsList()) ? 
getSortedTiers(tableConfig) : List.of();
     LOGGER.info("For rebalanceId: {}, updating target tiers for segments of 
table: {} with tierConfigs: {}",
         rebalanceJobId, tableNameWithType, sortedTiers);
     Map<String, Set<String>> tierToSegmentsMap = new HashMap<>();
     for (String segmentName : getSegmentsFor(tableNameWithType, true)) {
-      String tier = updateSegmentTargetTier(tableNameWithType, segmentName, 
sortedTiers);
+      ZNRecord segmentMetadataZNRecord = 
getSegmentMetadataZnRecord(tableNameWithType, segmentName);
+      if (segmentMetadataZNRecord == null) {
+        LOGGER.warn("Failed to find ZK metadata for segment: {} of table: {}, 
skipping updating target tier",
+            segmentName, tableNameWithType);
+        continue;
+      }
+      SegmentZKMetadata segmentZKMetadata = new 
SegmentZKMetadata(segmentMetadataZNRecord);
+      String tier = segmentZKMetadata.getTier();
+      if (updateSegmentTargetTier(tableNameWithType, segmentZKMetadata, 
sortedTiers)) {
+        if (updateZkMetadata(tableNameWithType, segmentZKMetadata, 
segmentMetadataZNRecord.getVersion())) {
+          tier = segmentZKMetadata.getTier();
+        } else {
+          LOGGER.warn("Failed to persist ZK metadata for segment: {} of table: 
{} because of version change, skipping "
+              + "updating target tier", segmentName, tableNameWithType);
+        }
+      }
       if (tier != null) {
         tierToSegmentsMap.computeIfAbsent(tier, t -> new 
HashSet<>()).add(segmentName);
       }
     }
     return tierToSegmentsMap;
   }
 
-  private String updateSegmentTargetTier(String tableNameWithType, String 
segmentName, List<Tier> sortedTiers) {
-    ZNRecord segmentMetadataZNRecord = 
getSegmentMetadataZnRecord(tableNameWithType, segmentName);
-    if (segmentMetadataZNRecord == null) {
-      LOGGER.debug("No ZK metadata for segment: {} of table: {}", segmentName, 
tableNameWithType);
-      return null;
-    }
+  /// Calculates the target tier for the segment and set it into the segment 
ZK metadata. Returns `true` if the segment
+  /// ZK metadata is updated, `false` otherwise. This method does NOT persist 
the segment ZK metadata update to ZK.
+  public boolean updateSegmentTargetTier(String tableNameWithType, 
SegmentZKMetadata segmentZKMetadata,
+      List<Tier> sortedTiers) {
+    String segmentName = segmentZKMetadata.getSegmentName();
     Tier targetTier = null;
     for (Tier tier : sortedTiers) {
       TierSegmentSelector tierSegmentSelector = tier.getSegmentSelector();
-      if (tierSegmentSelector.selectSegment(tableNameWithType, segmentName)) {
+      if (tierSegmentSelector.selectSegment(tableNameWithType, 
segmentZKMetadata)) {
         targetTier = tier;
         break;
       }
     }
-    SegmentZKMetadata segmentZKMetadata = new 
SegmentZKMetadata(segmentMetadataZNRecord);
-    String targetTierName = null;
     if (targetTier == null) {
       if (segmentZKMetadata.getTier() == null) {
         LOGGER.debug("Segment: {} of table: {} is already set to go to default 
tier", segmentName, tableNameWithType);
-        return null;
+        return false;
       }
       LOGGER.info("Segment: {} of table: {} is put back on default tier", 
segmentName, tableNameWithType);
+      segmentZKMetadata.setTier(null);
+      return true;
     } else {
-      targetTierName = targetTier.getName();
+      String targetTierName = targetTier.getName();
       if (targetTierName.equals(segmentZKMetadata.getTier())) {
         LOGGER.debug("Segment: {} of table: {} is already set to go to target 
tier: {}", segmentName, tableNameWithType,
             targetTierName);
-        return targetTierName;
+        return false;
       }
       LOGGER.info("Segment: {} of table: {} is put onto new tier: {}", 
segmentName, tableNameWithType, targetTierName);
+      segmentZKMetadata.setTier(targetTierName);
+      return true;
     }
-    // Update the tier in segment ZK metadata and write it back to ZK.
-    segmentZKMetadata.setTier(targetTierName);
-    updateZkMetadata(tableNameWithType, segmentZKMetadata, 
segmentMetadataZNRecord.getVersion());

Review Comment:
   Wondering, If `enableParallelPushProtection` is set to false, then are we 
not going to updateZkMetadata?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to