somandal commented on code in PR #15368:
URL: https://github.com/apache/pinot/pull/15368#discussion_r2013047959


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -733,15 +793,79 @@ private RebalanceSummaryResult 
calculateDryRunSummary(Map<String, Map<String, St
         serversGettingNewSegments, serverSegmentChangeInfoMap);
     // TODO: Add a metric to estimate the total time it will take to 
rebalance. Need some good heuristics on how
     //       rebalance time can vary with number of segments added
+    RebalanceSummaryResult.ConsumingSegmentSummary consumingSegmentSummary =
+        isOfflineTable ? null : new 
RebalanceSummaryResult.ConsumingSegmentSummary(
+            consumingSegmentsToBeMoved, maxBytesToCatchUpForConsumingSegments, 
bytesToCatchUpForServers);
     RebalanceSummaryResult.SegmentInfo segmentInfo = new 
RebalanceSummaryResult.SegmentInfo(totalSegmentsToBeMoved,
         maxSegmentsAddedToServer, averageSegmentSizeInBytes, 
totalEstimatedDataToBeMovedInBytes,
-        replicationFactor, numSegmentsInSingleReplica, 
numSegmentsAcrossAllReplicas);
+        replicationFactor, numSegmentsInSingleReplica, 
numSegmentsAcrossAllReplicas, consumingSegmentSummary);
 
     LOGGER.info("Calculated rebalance summary for table: {} with 
rebalanceJobId: {}", tableNameWithType,
         rebalanceJobId);
     return new RebalanceSummaryResult(serverInfo, segmentInfo);
   }
 
+  @VisibleForTesting
+  ConsumingSegmentInfoReader getConsumingSegmentInfoReader() {
+    if (_executorService == null || _connectionManager == null || 
_pinotHelixResourceManager == null) {
+      return null;
+    }
+    return new ConsumingSegmentInfoReader(_executorService, 
_connectionManager, _pinotHelixResourceManager);
+  }
+
+  /**
+   * Fetches the consuming segment info for the table and calculates the 
number of bytes to catch up for each consuming
+   * segment. Returns a map from segment name to the number of bytes to catch 
up for that consuming segment. Return
+   * null if failed to obtain info for any consuming segment.
+   */
+  private Map<String, Integer> getConsumingSegmentsBytesToCatchUp(String 
tableNameWithType) {
+    ConsumingSegmentInfoReader consumingSegmentInfoReader = 
getConsumingSegmentInfoReader();
+    if (consumingSegmentInfoReader == null) {
+      LOGGER.warn("ConsumingSegmentInfoReader is null, cannot calculate 
consuming segments info for table: {}",
+          tableNameWithType);
+      return null;
+    }
+    Map<String, Integer> segmentToBytesToCatchUp = new HashMap<>();
+    try {
+      ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap 
consumingSegmentsInfoMap =
+          
consumingSegmentInfoReader.getConsumingSegmentsInfo(tableNameWithType, 30_000);
+      for (Map.Entry<String, 
List<ConsumingSegmentInfoReader.ConsumingSegmentInfo>> entry
+          : consumingSegmentsInfoMap._segmentToConsumingInfoMap.entrySet()) {
+        String segmentName = entry.getKey();
+        List<ConsumingSegmentInfoReader.ConsumingSegmentInfo> 
consumingSegmentInfoList = entry.getValue();
+        SegmentZKMetadata segmentZKMetadata =
+            
ZKMetadataProvider.getSegmentZKMetadata(_helixManager.getHelixPropertyStore(), 
tableNameWithType,
+                segmentName);
+        if (segmentZKMetadata == null) {
+          LOGGER.warn("Cannot find SegmentZKMetadata for segment: {} in table: 
{}", segmentName, tableNameWithType);
+          return null;
+        }
+        if (consumingSegmentInfoList != null) {
+          String startOffset = segmentZKMetadata.getStartOffset();
+          if (startOffset == null) {
+            LOGGER.warn("Start offset is null for segment: {} in table: {}", 
segmentName, tableNameWithType);
+            return null;
+          }
+          if (!consumingSegmentInfoList.isEmpty()) {
+            // this value should be the same regardless of which server the 
consuming segment info is from, use the
+            // first in the list here
+            int bytesToCatchUp = 
consumingSegmentInfoList.get(0)._partitionOffsetInfo._latestUpstreamOffsetMap.values()

Review Comment:
   I am not that familiar with this code, but how is this calculating `bytes` 
rather than number of offsets to catch up? It looks to me like you get start 
offset, and subtract it from this offset?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -243,6 +244,8 @@ private enum LineageUpdateType {
   private final LineageManager _lineageManager;
   private final RebalancePreChecker _rebalancePreChecker;
   private TableSizeReader _tableSizeReader;
+  private final ExecutorService _executorService;
+  private HttpClientConnectionManager _connectionManager;

Review Comment:
   as mentioned, let's not create this and pass it in, but directly create this 
in the code where we need it



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -124,10 +131,15 @@ public class TableRebalancer {
   private final ControllerMetrics _controllerMetrics;
   private final RebalancePreChecker _rebalancePreChecker;
   private final TableSizeReader _tableSizeReader;
+  private final ExecutorService _executorService;
+  private final HttpClientConnectionManager _connectionManager;

Review Comment:
   let's not pass this in if it can be avoided
   
   In the `DefaultPreChecker` we create a connection manager. I would recommend 
doing the same here:
   ```
       try (PoolingHttpClientConnectionManager connectionManager = new 
PoolingHttpClientConnectionManager()) {
    ...
    ...
       }
   ```
   
   Also recommend using a try-with-resources type block as mentioned above so 
that the connection manager is automatically closed when no longer needed



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -693,6 +738,21 @@ private RebalanceSummaryResult 
calculateDryRunSummary(Map<String, Map<String, St
       if (segmentsAdded > 0) {
         serversGettingNewSegments.add(server);
       }
+      if (!isOfflineTable) {
+        if (bytesToCatchUpForServers != null) {
+          bytesToCatchUpForServers.put(server, 0);
+        }
+        for (String segment : newSegmentSet) {
+          if (consumingSegments.contains(segment)) {
+            consumingSegmentsToBeMoved++;
+            if (bytesToCatchUpForSegments != null) {

Review Comment:
   can we track the scenario where we have `bytesToCatchUpForSegments` = null 
so that in the summary we can easily tell if the information is accurate or not?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceSummaryResult.java:
##########
@@ -240,6 +240,45 @@ public Map<String, ServerSegmentChangeInfo> 
getServerSegmentChangeInfo() {
     }
   }
 
+  public static class ConsumingSegmentSummary {
+    private final int _numConsumingSegmentsToBeMoved;
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    private final Integer _maxBytesConsumingSegmentsToCatchUp;
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    private final Map<String, Integer> 
_bytesConsumingSegmentsToCatchUpPerServer;

Review Comment:
   I would also add oldest age here as well per server if possible



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -733,15 +793,79 @@ private RebalanceSummaryResult 
calculateDryRunSummary(Map<String, Map<String, St
         serversGettingNewSegments, serverSegmentChangeInfoMap);
     // TODO: Add a metric to estimate the total time it will take to 
rebalance. Need some good heuristics on how
     //       rebalance time can vary with number of segments added
+    RebalanceSummaryResult.ConsumingSegmentSummary consumingSegmentSummary =
+        isOfflineTable ? null : new 
RebalanceSummaryResult.ConsumingSegmentSummary(
+            consumingSegmentsToBeMoved, maxBytesToCatchUpForConsumingSegments, 
bytesToCatchUpForServers);
     RebalanceSummaryResult.SegmentInfo segmentInfo = new 
RebalanceSummaryResult.SegmentInfo(totalSegmentsToBeMoved,
         maxSegmentsAddedToServer, averageSegmentSizeInBytes, 
totalEstimatedDataToBeMovedInBytes,
-        replicationFactor, numSegmentsInSingleReplica, 
numSegmentsAcrossAllReplicas);
+        replicationFactor, numSegmentsInSingleReplica, 
numSegmentsAcrossAllReplicas, consumingSegmentSummary);
 
     LOGGER.info("Calculated rebalance summary for table: {} with 
rebalanceJobId: {}", tableNameWithType,
         rebalanceJobId);
     return new RebalanceSummaryResult(serverInfo, segmentInfo);
   }
 
+  @VisibleForTesting
+  ConsumingSegmentInfoReader getConsumingSegmentInfoReader() {
+    if (_executorService == null || _connectionManager == null || 
_pinotHelixResourceManager == null) {
+      return null;
+    }
+    return new ConsumingSegmentInfoReader(_executorService, 
_connectionManager, _pinotHelixResourceManager);
+  }
+
+  /**
+   * Fetches the consuming segment info for the table and calculates the 
number of bytes to catch up for each consuming
+   * segment. Returns a map from segment name to the number of bytes to catch 
up for that consuming segment. Return
+   * null if failed to obtain info for any consuming segment.
+   */
+  private Map<String, Integer> getConsumingSegmentsBytesToCatchUp(String 
tableNameWithType) {
+    ConsumingSegmentInfoReader consumingSegmentInfoReader = 
getConsumingSegmentInfoReader();
+    if (consumingSegmentInfoReader == null) {
+      LOGGER.warn("ConsumingSegmentInfoReader is null, cannot calculate 
consuming segments info for table: {}",
+          tableNameWithType);
+      return null;
+    }

Review Comment:
   Can we have an entry and exit log for this function? my concern is that 
since this also fetches ZK metadata, it can get expensive if there are lots of 
segments. The logs can help identify how long this calculation is taking



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -693,6 +738,21 @@ private RebalanceSummaryResult 
calculateDryRunSummary(Map<String, Map<String, St
       if (segmentsAdded > 0) {
         serversGettingNewSegments.add(server);
       }
+      if (!isOfflineTable) {
+        if (bytesToCatchUpForServers != null) {
+          bytesToCatchUpForServers.put(server, 0);
+        }

Review Comment:
   let's create an entry here only if this server has new segments added? Move 
this into the for loop below, and within the if condition `if 
(bytesToCatchUpForSegments != null) {`



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -693,6 +738,21 @@ private RebalanceSummaryResult 
calculateDryRunSummary(Map<String, Map<String, St
       if (segmentsAdded > 0) {
         serversGettingNewSegments.add(server);
       }
+      if (!isOfflineTable) {
+        if (bytesToCatchUpForServers != null) {
+          bytesToCatchUpForServers.put(server, 0);
+        }
+        for (String segment : newSegmentSet) {
+          if (consumingSegments.contains(segment)) {
+            consumingSegmentsToBeMoved++;
+            if (bytesToCatchUpForSegments != null) {
+              int bytesToCatchUp = 
bytesToCatchUpForSegments.getOrDefault(segment, 0);
+              maxBytesToCatchUpForConsumingSegments = 
Math.max(maxBytesToCatchUpForConsumingSegments, bytesToCatchUp);
+              bytesToCatchUpForServers.put(server, 
bytesToCatchUpForServers.get(server) + bytesToCatchUp);

Review Comment:
   is it possible to use putIfAbsent / computeIfAbsent here to initialize to 0?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceSummaryResult.java:
##########
@@ -240,6 +240,45 @@ public Map<String, ServerSegmentChangeInfo> 
getServerSegmentChangeInfo() {
     }
   }
 
+  public static class ConsumingSegmentSummary {
+    private final int _numConsumingSegmentsToBeMoved;
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    private final Integer _maxBytesConsumingSegmentsToCatchUp;

Review Comment:
   can we also track the difference in time (i.e. current time - segment 
creation time)
   
   We can have multiple scenarios:
   - Segment was created a long time ago but very few events
   - Segment was created recently but many events
   - Segment was created a long time ago and has many events
   - Segment created recently and has very few events
   
   It might be good to get the above perspective



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -1965,6 +1969,10 @@ public void registerTableSizeReader(TableSizeReader 
tableSizeReader) {
     _tableSizeReader = tableSizeReader;
   }
 
+  public void registerConnectionManager(HttpClientConnectionManager 
connectionManager) {

Review Comment:
   remove this after you modify the code to create the connection manager 
directly



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceSummaryResult.java:
##########
@@ -240,6 +240,45 @@ public Map<String, ServerSegmentChangeInfo> 
getServerSegmentChangeInfo() {
     }
   }
 
+  public static class ConsumingSegmentSummary {
+    private final int _numConsumingSegmentsToBeMoved;
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    private final Integer _maxBytesConsumingSegmentsToCatchUp;

Review Comment:
   Also, I thought we had discussed capturing the top 10 (or top X) segments 
and dumping their offset difference and time difference. here I see you've only 
captured the max?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceSummaryResult.java:
##########
@@ -240,6 +240,45 @@ public Map<String, ServerSegmentChangeInfo> 
getServerSegmentChangeInfo() {
     }
   }
 
+  public static class ConsumingSegmentSummary {
+    private final int _numConsumingSegmentsToBeMoved;
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    private final Integer _maxBytesConsumingSegmentsToCatchUp;
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    private final Map<String, Integer> 
_bytesConsumingSegmentsToCatchUpPerServer;

Review Comment:
   again, is bytes correct here? or should it be number of events (offsets)?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -664,6 +686,29 @@ private RebalanceSummaryResult 
calculateDryRunSummary(Map<String, Map<String, St
     Set<String> serversGettingNewSegments = new HashSet<>();
     Map<String, RebalanceSummaryResult.ServerSegmentChangeInfo> 
serverSegmentChangeInfoMap = new HashMap<>();
     int segmentsNotMoved = 0;
+    Set<String> consumingSegments = new HashSet<>();
+    boolean isOfflineTable = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType) == 
TableType.OFFLINE;
+    Integer consumingSegmentsToBeMoved = null;
+    Integer maxBytesToCatchUpForConsumingSegments = null;
+    Map<String, Integer> bytesToCatchUpForSegments = null;
+    Map<String, Integer> bytesToCatchUpForServers = null;
+    if (!isOfflineTable) {
+      consumingSegmentsToBeMoved = 0;
+      bytesToCatchUpForSegments = 
getConsumingSegmentsBytesToCatchUp(tableNameWithType);

Review Comment:
   this is an expensive operation and we don't really need to calculate this 
for REALTIME tables if consuming segments aren't to be moved. Can we only run 
this when we know for sure that CONSUMING segments are even involved in the 
rebalance? (i.e. make sure at least 1 consuming segment is moving)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to