J-HowHuang commented on code in PR #15368:
URL: https://github.com/apache/pinot/pull/15368#discussion_r2013095417
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -733,15 +793,79 @@ private RebalanceSummaryResult
calculateDryRunSummary(Map<String, Map<String, St
serversGettingNewSegments, serverSegmentChangeInfoMap);
// TODO: Add a metric to estimate the total time it will take to
rebalance. Need some good heuristics on how
// rebalance time can vary with number of segments added
+ RebalanceSummaryResult.ConsumingSegmentSummary consumingSegmentSummary =
+ isOfflineTable ? null : new
RebalanceSummaryResult.ConsumingSegmentSummary(
+ consumingSegmentsToBeMoved, maxBytesToCatchUpForConsumingSegments,
bytesToCatchUpForServers);
RebalanceSummaryResult.SegmentInfo segmentInfo = new
RebalanceSummaryResult.SegmentInfo(totalSegmentsToBeMoved,
maxSegmentsAddedToServer, averageSegmentSizeInBytes,
totalEstimatedDataToBeMovedInBytes,
- replicationFactor, numSegmentsInSingleReplica,
numSegmentsAcrossAllReplicas);
+ replicationFactor, numSegmentsInSingleReplica,
numSegmentsAcrossAllReplicas, consumingSegmentSummary);
LOGGER.info("Calculated rebalance summary for table: {} with
rebalanceJobId: {}", tableNameWithType,
rebalanceJobId);
return new RebalanceSummaryResult(serverInfo, segmentInfo);
}
+ @VisibleForTesting
+ ConsumingSegmentInfoReader getConsumingSegmentInfoReader() {
+ if (_executorService == null || _connectionManager == null ||
_pinotHelixResourceManager == null) {
+ return null;
+ }
+ return new ConsumingSegmentInfoReader(_executorService,
_connectionManager, _pinotHelixResourceManager);
+ }
+
+ /**
+ * Fetches the consuming segment info for the table and calculates the
number of bytes to catch up for each consuming
+ * segment. Returns a map from segment name to the number of bytes to catch
up for that consuming segment. Return
+ * null if failed to obtain info for any consuming segment.
+ */
+ private Map<String, Integer> getConsumingSegmentsBytesToCatchUp(String
tableNameWithType) {
+ ConsumingSegmentInfoReader consumingSegmentInfoReader =
getConsumingSegmentInfoReader();
+ if (consumingSegmentInfoReader == null) {
+ LOGGER.warn("ConsumingSegmentInfoReader is null, cannot calculate
consuming segments info for table: {}",
+ tableNameWithType);
+ return null;
+ }
+ Map<String, Integer> segmentToBytesToCatchUp = new HashMap<>();
+ try {
+ ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap
consumingSegmentsInfoMap =
+
consumingSegmentInfoReader.getConsumingSegmentsInfo(tableNameWithType, 30_000);
+ for (Map.Entry<String,
List<ConsumingSegmentInfoReader.ConsumingSegmentInfo>> entry
+ : consumingSegmentsInfoMap._segmentToConsumingInfoMap.entrySet()) {
+ String segmentName = entry.getKey();
+ List<ConsumingSegmentInfoReader.ConsumingSegmentInfo>
consumingSegmentInfoList = entry.getValue();
+ SegmentZKMetadata segmentZKMetadata =
+
ZKMetadataProvider.getSegmentZKMetadata(_helixManager.getHelixPropertyStore(),
tableNameWithType,
+ segmentName);
+ if (segmentZKMetadata == null) {
+ LOGGER.warn("Cannot find SegmentZKMetadata for segment: {} in table:
{}", segmentName, tableNameWithType);
+ return null;
+ }
+ if (consumingSegmentInfoList != null) {
+ String startOffset = segmentZKMetadata.getStartOffset();
+ if (startOffset == null) {
+ LOGGER.warn("Start offset is null for segment: {} in table: {}",
segmentName, tableNameWithType);
+ return null;
+ }
+ if (!consumingSegmentInfoList.isEmpty()) {
+ // this value should be the same regardless of which server the
consuming segment info is from, use the
+ // first in the list here
+ int bytesToCatchUp =
consumingSegmentInfoList.get(0)._partitionOffsetInfo._latestUpstreamOffsetMap.values()
Review Comment:
No. It's calculating (latest offset - start offset). And that's why we need
segment ZK metadata involved in this case.
Kinesis is not available at this moment (see the section #known-issue in
this PR). Haven't deep dived into that, but by tracing the error I suppose it
has something to do with their shard naming convention.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]