klsince commented on code in PR #15368:
URL: https://github.com/apache/pinot/pull/15368#discussion_r2025635256


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -270,6 +275,7 @@ public String load(String instanceId) {
       _lineageUpdaterLocks[i] = new Object();
     }
     _lineageManager = lineageManager;
+    _executorService = executorService;

Review Comment:
   looks like no need for _executorService and _connMgr, as you provide the 
`_consumingSegmentInfoReader` to pinotHelixResMgr directly



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceSummaryResult.java:
##########
@@ -306,18 +297,120 @@ public Map<String, ServerSegmentChangeInfo> 
getServerSegmentChangeInfo() {
     }
   }
 
+  public static class ConsumingSegmentToBeMovedSummary {
+    private final int _numConsumingSegmentsToBeMoved;
+    private final int _numServerGettingConsumingSegmentsAdded;

Review Comment:
   nit: numServer`s`GettingConsumingSegmentsAdded



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -627,22 +644,42 @@ private RebalanceSummaryResult 
calculateDryRunSummary(Map<String, Map<String, St
       TableSizeReader.TableSubTypeSizeDetails tableSubTypeSizeDetails, 
TableConfig tableConfig) {
     LOGGER.info("Calculating rebalance summary for table: {} with 
rebalanceJobId: {}",
         tableNameWithType, rebalanceJobId);
+    boolean isOfflineTable = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType) == 
TableType.OFFLINE;
     int existingReplicationFactor = 0;
     int newReplicationFactor = 0;
     Map<String, Set<String>> existingServersToSegmentMap = new HashMap<>();
     Map<String, Set<String>> newServersToSegmentMap = new HashMap<>();
+    Map<String, Set<String>> existingServersToConsumingSegmentMap = 
isOfflineTable ? null : new HashMap<>();
+    Map<String, Set<String>> newServersToConsumingSegmentMap = isOfflineTable 
? null : new HashMap<>();
 
     for (Map.Entry<String, Map<String, String>> entrySet : 
currentAssignment.entrySet()) {
       existingReplicationFactor = entrySet.getValue().size();
-      for (String segmentKey : entrySet.getValue().keySet()) {
-        existingServersToSegmentMap.computeIfAbsent(segmentKey, k -> new 
HashSet<>()).add(entrySet.getKey());
+      String segmentName = entrySet.getKey();
+      Collection<String> segmentStates = entrySet.getValue().values();
+      boolean isSegmentConsuming = existingServersToConsumingSegmentMap != 
null && segmentStates.stream()
+          .noneMatch(state -> state.equals(SegmentStateModel.ONLINE)) && 
segmentStates.stream()
+          .anyMatch(state -> state.equals(SegmentStateModel.CONSUMING));
+
+      for (String instanceName : entrySet.getValue().keySet()) {
+        existingServersToSegmentMap.computeIfAbsent(instanceName, k -> new 
HashSet<>()).add(segmentName);
+        if (isSegmentConsuming) {
+          existingServersToConsumingSegmentMap.computeIfAbsent(instanceName, k 
-> new HashSet<>()).add(segmentName);

Review Comment:
   why not just use the segment's CONSUMING status for the server to decide the 
map? e.g 
   ```
   if (existingServersToConsumingSegmentMap != null && 
entrySet.getValue().get(instanceName) == CONSUMING) {  
   }
   ```
   
   iiuc, using isSegmentConsuming boolean may track a segment as CONSUMING for 
a server even if it's not for that server



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceSummaryResult.java:
##########
@@ -306,18 +297,120 @@ public Map<String, ServerSegmentChangeInfo> 
getServerSegmentChangeInfo() {
     }
   }
 
+  public static class ConsumingSegmentToBeMovedSummary {
+    private final int _numConsumingSegmentsToBeMoved;
+    private final int _numServerGettingConsumingSegmentsAdded;
+    private final Map<String, Integer> 
_consumingSegmentsToBeMovedWithMostOffsetsToCatchUp;
+    private final Map<String, Integer> 
_oldestConsumingSegmentsToBeMovedInMinutes;
+    private final Map<String, ConsumingSegmentSummaryPerServer> 
_serverConsumingSegmentSummary;
+
+    /**
+     * Constructor for ConsumingSegmentToBeMovedSummary
+     * @param numConsumingSegmentsToBeMoved total number of consuming segments 
to be moved as part of this rebalance
+     * @param numServerGettingConsumingSegmentsAdded maximum bytes of 
consuming segments to be moved to catch up
+     * @param consumingSegmentsToBeMovedWithMostOffsetsToCatchUp top consuming 
segments to be moved to catch up.
+     *                                                           Map from 
segment name to its number of offsets to
+     *                                                           catch up on 
the new server. This is essentially the
+     *                                                           difference 
between the latest offset of the stream
+     *                                                           and the 
segment's start offset of the stream. Set to
+     *                                                           null if the 
number of offsets to catch up could not
+     *                                                           be determined 
for at least one consuming segment
+     * @param oldestConsumingSegmentsToBeMovedInMinutes oldest consuming 
segments to be moved to catch up. Map from
+     *                                                  segment name to its 
age in minutes. The age of a segment is
+     *                                                  determined by its 
creation time from ZK metadata. Set to null
+     *                                                  if ZK metadata is not 
available or the creation time is not
+     *                                                  found for at least one 
consuming segment
+     * @param serverConsumingSegmentSummary ConsumingSegmentSummaryPerServer 
per server
+     */
+    @JsonCreator
+    public ConsumingSegmentToBeMovedSummary(
+        @JsonProperty("numConsumingSegmentsToBeMoved") int 
numConsumingSegmentsToBeMoved,
+        @JsonProperty("numServerGettingConsumingSegmentsAdded") int 
numServerGettingConsumingSegmentsAdded,
+        @JsonProperty("consumingSegmentsToBeMovedWithMostOffsetsToCatchUp") 
@Nullable
+        Map<String, Integer> 
consumingSegmentsToBeMovedWithMostOffsetsToCatchUp,
+        @JsonProperty("oldestConsumingSegmentsToBeMovedInMinutes") @Nullable
+        Map<String, Integer> oldestConsumingSegmentsToBeMovedInMinutes,
+        @JsonProperty("serverConsumingSegmentSummary") @Nullable
+        Map<String, ConsumingSegmentSummaryPerServer> 
serverConsumingSegmentSummary) {
+      _numConsumingSegmentsToBeMoved = numConsumingSegmentsToBeMoved;
+      _numServerGettingConsumingSegmentsAdded = 
numServerGettingConsumingSegmentsAdded;
+      _consumingSegmentsToBeMovedWithMostOffsetsToCatchUp = 
consumingSegmentsToBeMovedWithMostOffsetsToCatchUp;
+      _oldestConsumingSegmentsToBeMovedInMinutes = 
oldestConsumingSegmentsToBeMovedInMinutes;
+      _serverConsumingSegmentSummary = serverConsumingSegmentSummary;
+    }
+
+    @JsonProperty
+    public int getNumConsumingSegmentsToBeMoved() {
+      return _numConsumingSegmentsToBeMoved;
+    }
+
+    @JsonProperty
+    public int getNumServerGettingConsumingSegmentsAdded() {
+      return _numServerGettingConsumingSegmentsAdded;
+    }
+
+    @JsonProperty
+    public Map<String, Integer> 
getConsumingSegmentsToBeMovedWithMostOffsetsToCatchUp() {
+      return _consumingSegmentsToBeMovedWithMostOffsetsToCatchUp;
+    }
+
+    @JsonProperty
+    public Map<String, Integer> getOldestConsumingSegmentsToBeMovedInMinutes() 
{
+      return _oldestConsumingSegmentsToBeMovedInMinutes;
+    }
+
+    @JsonProperty
+    public Map<String, ConsumingSegmentSummaryPerServer> 
getServerConsumingSegmentSummary() {
+      return _serverConsumingSegmentSummary;
+    }
+
+    public static class ConsumingSegmentSummaryPerServer {
+      private final int _numConsumingSegmentToBeAdded;

Review Comment:
   nit: _numConsumingSegment`s`ToBeAdded



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceSummaryResult.java:
##########
@@ -306,18 +297,120 @@ public Map<String, ServerSegmentChangeInfo> 
getServerSegmentChangeInfo() {
     }
   }
 
+  public static class ConsumingSegmentToBeMovedSummary {
+    private final int _numConsumingSegmentsToBeMoved;
+    private final int _numServerGettingConsumingSegmentsAdded;
+    private final Map<String, Integer> 
_consumingSegmentsToBeMovedWithMostOffsetsToCatchUp;
+    private final Map<String, Integer> 
_oldestConsumingSegmentsToBeMovedInMinutes;
+    private final Map<String, ConsumingSegmentSummaryPerServer> 
_serverConsumingSegmentSummary;
+
+    /**
+     * Constructor for ConsumingSegmentToBeMovedSummary
+     * @param numConsumingSegmentsToBeMoved total number of consuming segments 
to be moved as part of this rebalance
+     * @param numServerGettingConsumingSegmentsAdded maximum bytes of 
consuming segments to be moved to catch up
+     * @param consumingSegmentsToBeMovedWithMostOffsetsToCatchUp top consuming 
segments to be moved to catch up.

Review Comment:
   got it. So, segments with most mins to catch up could be different from 
segments with most offsets to catch up, as data's time may not align with 
data's offset (when old data is backfilled)
   
   nit: how about rename 
   consumingSegmentsToBeMovedWith`OldestTime`ToCatchUp (or `...MostMins...` or 
`...MostTime...`)



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -627,22 +644,42 @@ private RebalanceSummaryResult 
calculateDryRunSummary(Map<String, Map<String, St
       TableSizeReader.TableSubTypeSizeDetails tableSubTypeSizeDetails, 
TableConfig tableConfig) {
     LOGGER.info("Calculating rebalance summary for table: {} with 
rebalanceJobId: {}",
         tableNameWithType, rebalanceJobId);
+    boolean isOfflineTable = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType) == 
TableType.OFFLINE;
     int existingReplicationFactor = 0;
     int newReplicationFactor = 0;
     Map<String, Set<String>> existingServersToSegmentMap = new HashMap<>();
     Map<String, Set<String>> newServersToSegmentMap = new HashMap<>();
+    Map<String, Set<String>> existingServersToConsumingSegmentMap = 
isOfflineTable ? null : new HashMap<>();
+    Map<String, Set<String>> newServersToConsumingSegmentMap = isOfflineTable 
? null : new HashMap<>();
 
     for (Map.Entry<String, Map<String, String>> entrySet : 
currentAssignment.entrySet()) {
       existingReplicationFactor = entrySet.getValue().size();
-      for (String segmentKey : entrySet.getValue().keySet()) {
-        existingServersToSegmentMap.computeIfAbsent(segmentKey, k -> new 
HashSet<>()).add(entrySet.getKey());
+      String segmentName = entrySet.getKey();
+      Collection<String> segmentStates = entrySet.getValue().values();
+      boolean isSegmentConsuming = existingServersToConsumingSegmentMap != 
null && segmentStates.stream()
+          .noneMatch(state -> state.equals(SegmentStateModel.ONLINE)) && 
segmentStates.stream()
+          .anyMatch(state -> state.equals(SegmentStateModel.CONSUMING));
+
+      for (String instanceName : entrySet.getValue().keySet()) {
+        existingServersToSegmentMap.computeIfAbsent(instanceName, k -> new 
HashSet<>()).add(segmentName);
+        if (isSegmentConsuming) {
+          existingServersToConsumingSegmentMap.computeIfAbsent(instanceName, k 
-> new HashSet<>()).add(segmentName);
+        }
       }
     }
 
     for (Map.Entry<String, Map<String, String>> entrySet : 
targetAssignment.entrySet()) {
       newReplicationFactor = entrySet.getValue().size();
-      for (String segmentKey : entrySet.getValue().keySet()) {
-        newServersToSegmentMap.computeIfAbsent(segmentKey, k -> new 
HashSet<>()).add(entrySet.getKey());
+      String segmentName = entrySet.getKey();
+      boolean isSegmentConsuming = existingServersToConsumingSegmentMap != 
null && entrySet.getValue()
+          .values()
+          .stream()
+          .allMatch(state -> state.equals(SegmentStateModel.CONSUMING));

Review Comment:
   hmm.. why the criteria is different between here and L664 above?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -806,6 +854,154 @@ private List<String> getServerTag(String serverName) {
     return instanceConfig.getTags();
   }
 
+  private RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary 
getConsumingSegmentSummary(String tableNameWithType,
+      Map<String, Set<String>> newServersToConsumingSegmentMap) {
+    if (newServersToConsumingSegmentMap.isEmpty()) {
+      return new RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary(0, 0, 
new HashMap<>(), new HashMap<>(),
+          new HashMap<>());
+    }
+    int numConsumingSegmentsToBeMoved =
+        newServersToConsumingSegmentMap.values().stream().reduce(0, (a, b) -> 
a + b.size(), Integer::sum);
+    Set<String> uniqueConsumingSegments =
+        
newServersToConsumingSegmentMap.values().stream().flatMap(Set::stream).collect(Collectors.toSet());
+    Map<String, SegmentZKMetadata> consumingSegmentZKmetadata = new 
HashMap<>();
+    uniqueConsumingSegments.forEach(segment -> 
consumingSegmentZKmetadata.put(segment,
+        
ZKMetadataProvider.getSegmentZKMetadata(_helixManager.getHelixPropertyStore(), 
tableNameWithType, segment)));
+    Map<String, Integer> consumingSegmentsOffsetsToCatchUp =
+        getConsumingSegmentsOffsetsToCatchUp(tableNameWithType, 
consumingSegmentZKmetadata);
+    Map<String, Integer> consumingSegmentsAge = 
getConsumingSegmentsAge(tableNameWithType, consumingSegmentZKmetadata);
+
+    Map<String, Integer> consumingSegmentsOffsetsToCatchUpTopN;
+    Map<String, 
RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary.ConsumingSegmentSummaryPerServer>
+        consumingSegmentSummaryPerServer =
+        new HashMap<>();
+    if (consumingSegmentsOffsetsToCatchUp != null) {
+      consumingSegmentsOffsetsToCatchUpTopN = 
getTopNConsumingSegmentWithValue(consumingSegmentsOffsetsToCatchUp);
+      newServersToConsumingSegmentMap.forEach((server, segments) -> {
+        int totalOffsetsToCatchUp =
+            
segments.stream().mapToInt(consumingSegmentsOffsetsToCatchUp::get).sum();
+        consumingSegmentSummaryPerServer.put(server,
+            new 
RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary.ConsumingSegmentSummaryPerServer(
+                segments.size(), totalOffsetsToCatchUp));
+      });
+    } else {
+      consumingSegmentsOffsetsToCatchUpTopN = null;
+      newServersToConsumingSegmentMap.forEach((server, segments) -> {
+        consumingSegmentSummaryPerServer.put(server,
+            new 
RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary.ConsumingSegmentSummaryPerServer(
+                segments.size(), -1));
+      });
+    }
+
+    Map<String, Integer> consumingSegmentsOldestTopN =
+        consumingSegmentsAge == null ? null : 
getTopNConsumingSegmentWithValue(consumingSegmentsAge);
+
+    return new 
RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary(numConsumingSegmentsToBeMoved,
+        newServersToConsumingSegmentMap.size(), 
consumingSegmentsOffsetsToCatchUpTopN, consumingSegmentsOldestTopN,
+        consumingSegmentSummaryPerServer);
+  }
+
+  private static Map<String, Integer> getTopNConsumingSegmentWithValue(
+      Map<String, Integer> consumingSegmentsWithValue) {
+    Map<String, Integer> topNConsumingSegments = new LinkedHashMap<>();
+    consumingSegmentsWithValue.entrySet()
+        .stream()
+        .sorted(Collections.reverseOrder(Map.Entry.comparingByValue()))
+        .limit(TOP_N_IN_CONSUMING_SEGMENT_SUMMARY)
+        .forEach(entry -> topNConsumingSegments.put(entry.getKey(), 
entry.getValue()));
+    return topNConsumingSegments;
+  }
+
+  /**
+   * Fetches the age of each consuming segment in minutes.
+   * The age of a consuming segment is the time since the segment was created 
in ZK, it could be different to when
+   * the stream should start to be consumed for the segment.
+   * consumingSegmentZKMetadata is a map from consuming segments to be moved 
to their ZK metadata. Returns a map from
+   * segment name to the age of that consuming segment. Return null if failed 
to obtain info for any consuming segment.
+   */
+  @Nullable
+  private Map<String, Integer> getConsumingSegmentsAge(String 
tableNameWithType,
+      Map<String, SegmentZKMetadata> consumingSegmentZKMetadata) {
+    Map<String, Integer> consumingSegmentsAge = new HashMap<>();
+    long now = System.currentTimeMillis();
+    try {
+      consumingSegmentZKMetadata.forEach(((s, segmentZKMetadata) -> {
+        if (segmentZKMetadata == null) {
+          LOGGER.warn("SegmentZKMetadata is null for segment: {} in table: 
{}", s, tableNameWithType);
+          throw new RuntimeException("SegmentZKMetadata is null");
+        }
+        long creationTime = segmentZKMetadata.getCreationTime();
+        if (creationTime < 0) {
+          LOGGER.warn("Creation time is not found for segment: {} in table: 
{}", s, tableNameWithType);
+          throw new RuntimeException("Creation time is not found");
+        }
+        consumingSegmentsAge.put(s, (int) (now - creationTime) / 60_000);
+      }));
+    } catch (Exception e) {
+      return null;
+    }
+    return consumingSegmentsAge;
+  }
+
+  @VisibleForTesting
+  ConsumingSegmentInfoReader getConsumingSegmentInfoReader() {
+    return _consumingSegmentInfoReader;
+  }
+
+  /**
+   * Fetches the consuming segment info for the table and calculates the 
number of offsets to catch up for each
+   * consuming segment. consumingSegmentZKMetadata is a map from consuming 
segments to be moved to their ZK metadata.
+   * Returns a map from segment name to the number of offsets to catch up for 
that consuming
+   * segment. Return null if failed to obtain info for any consuming segment.
+   */
+  @Nullable
+  private Map<String, Integer> getConsumingSegmentsOffsetsToCatchUp(String 
tableNameWithType,
+      Map<String, SegmentZKMetadata> consumingSegmentZKMetadata) {
+    ConsumingSegmentInfoReader consumingSegmentInfoReader = 
getConsumingSegmentInfoReader();
+    if (consumingSegmentInfoReader == null) {
+      LOGGER.warn("ConsumingSegmentInfoReader is null, cannot calculate 
consuming segments info for table: {}",
+          tableNameWithType);
+      return null;
+    }
+    Map<String, Integer> segmentToOffsetsToCatchUp = new HashMap<>();
+    try {
+      ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap 
consumingSegmentsInfoMap =
+          
consumingSegmentInfoReader.getConsumingSegmentsInfo(tableNameWithType, 30_000);
+      for (Map.Entry<String, SegmentZKMetadata> entry : 
consumingSegmentZKMetadata.entrySet()) {
+        String segmentName = entry.getKey();
+        List<ConsumingSegmentInfoReader.ConsumingSegmentInfo> 
consumingSegmentInfoList =
+            
consumingSegmentsInfoMap._segmentToConsumingInfoMap.getOrDefault(segmentName, 
null);
+        SegmentZKMetadata segmentZKMetadata = entry.getValue();
+        if (segmentZKMetadata == null) {
+          LOGGER.warn("Cannot find SegmentZKMetadata for segment: {} in table: 
{}", segmentName, tableNameWithType);
+          return null;
+        }
+        String startOffset = segmentZKMetadata.getStartOffset();
+        if (startOffset == null) {
+          LOGGER.warn("Start offset is null for segment: {} in table: {}", 
segmentName, tableNameWithType);
+          return null;
+        }
+        if (consumingSegmentInfoList != null && 
!consumingSegmentInfoList.isEmpty()) {

Review Comment:
   nit: swap the if and else branch, 
   ```
   if ( == null || isEmpty()) {
     log.warn(...);
     return;
   }
   ...
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to