mcvsubbu commented on code in PR #9289: URL: https://github.com/apache/pinot/pull/9289#discussion_r957635296
########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java: ########## @@ -1074,6 +1077,15 @@ IdealState ensureAllPartitionsConsuming(TableConfig tableConfig, PartitionLevelS // Get the latest segment ZK metadata for each partition Map<Integer, SegmentZKMetadata> latestSegmentZKMetadataMap = getLatestSegmentZKMetadataMap(realtimeTableName); + Map<Integer, StreamPartitionMsgOffset> partitionGroupIdToStartOffset = new HashMap<>(); Review Comment: Can u add a comment before this line what this map is supposed to contain? The logic in this class is getting quite hard to read, can we even base class some methods and sub-class the partitionGroup vs partitionId for the two different type of streams we suppoer? ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java: ########## @@ -1144,21 +1156,33 @@ IdealState ensureAllPartitionsConsuming(TableConfig tableConfig, PartitionLevelS // 3. we should never end up with some replicas ONLINE and some OFFLINE. if (isAllInstancesInState(instanceStateMap, SegmentStateModel.OFFLINE)) { LOGGER.info("Repairing segment: {} which is OFFLINE for all instances in IdealState", latestSegmentName); - StreamPartitionMsgOffset startOffset = offsetFactory.create(latestSegmentZKMetadata.getStartOffset()); + if (partitionGroupIdToSmallestStreamOffset == null) { + partitionGroupIdToSmallestStreamOffset = fetchPartitionGroupIdToSmallestOffset(streamConfig); + } + StreamPartitionMsgOffset startOffset = + selectStartOffset(offsetCriteria, partitionGroupId, partitionGroupIdToStartOffset, + partitionGroupIdToSmallestStreamOffset, tableConfig.getTableName(), offsetFactory, + latestSegmentZKMetadata.getStartOffset()); // segments are OFFLINE; start from beginning createNewConsumingSegment(tableConfig, streamConfig, latestSegmentZKMetadata, currentTimeMs, - partitionGroupId, newPartitionGroupMetadataList, instancePartitions, instanceStatesMap, - segmentAssignment, instancePartitionsMap, startOffset); + newPartitionGroupMetadataList, instancePartitions, instanceStatesMap, segmentAssignment, + instancePartitionsMap, startOffset); } else { if (newPartitionGroupSet.contains(partitionGroupId)) { if (recreateDeletedConsumingSegment && latestSegmentZKMetadata.getStatus().isCompleted() && isAllInstancesInState(instanceStateMap, SegmentStateModel.ONLINE)) { // If we get here, that means in IdealState, the latest segment has all replicas ONLINE. // Create a new IN_PROGRESS segment in PROPERTYSTORE, // add it as CONSUMING segment to IDEALSTATE. - StreamPartitionMsgOffset startOffset = offsetFactory.create(latestSegmentZKMetadata.getEndOffset()); + if (partitionGroupIdToSmallestStreamOffset == null) { Review Comment: It may make things more readable if we can get the smallest offset all the time? Does it involve multiple calls to the stream, and is that what we are optimizing here? If so, good to add a comment. Otherwise, getting it once unconditionally make make things a bit more readable. ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java: ########## @@ -1254,21 +1261,39 @@ private void createNewConsumingSegment(TableConfig tableConfig, PartitionLevelSt instancePartitionsMap); } - @Nullable - private StreamPartitionMsgOffset getPartitionGroupSmallestOffset(StreamConfig streamConfig, int partitionGroupId) { + private Map<Integer, StreamPartitionMsgOffset> fetchPartitionGroupIdToSmallestOffset(StreamConfig streamConfig) { OffsetCriteria originalOffsetCriteria = streamConfig.getOffsetCriteria(); streamConfig.setOffsetCriteria(OffsetCriteria.SMALLEST_OFFSET_CRITERIA); - List<PartitionGroupMetadata> smallestOffsetCriteriaPartitionGroupMetadata = + List<PartitionGroupMetadata> partitionGroupMetadataList = getNewPartitionGroupMetadataList(streamConfig, Collections.emptyList()); streamConfig.setOffsetCriteria(originalOffsetCriteria); - StreamPartitionMsgOffset partitionStartOffset = null; - for (PartitionGroupMetadata info : smallestOffsetCriteriaPartitionGroupMetadata) { - if (info.getPartitionGroupId() == partitionGroupId) { - partitionStartOffset = info.getStartOffset(); - break; + Map<Integer, StreamPartitionMsgOffset> partitionGroupIdToSmallestOffset = new HashMap<>(); + for (PartitionGroupMetadata metadata : partitionGroupMetadataList) { + partitionGroupIdToSmallestOffset.put(metadata.getPartitionGroupId(), metadata.getStartOffset()); + } + return partitionGroupIdToSmallestOffset; + } + + private StreamPartitionMsgOffset selectStartOffset(OffsetCriteria offsetCriteria, int partitionGroupId, Review Comment: Can you consider removing the `offsetCriteria` from the argument here, and incorporating the logic to deal with a non-null value of `offsetCriteria` outside this method? Not sure if it will make the logic more readable, but worth a try, I think -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org