mcvsubbu commented on code in PR #9289:
URL: https://github.com/apache/pinot/pull/9289#discussion_r957635296


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -1074,6 +1077,15 @@ IdealState ensureAllPartitionsConsuming(TableConfig 
tableConfig, PartitionLevelS
     // Get the latest segment ZK metadata for each partition
     Map<Integer, SegmentZKMetadata> latestSegmentZKMetadataMap = 
getLatestSegmentZKMetadataMap(realtimeTableName);
 
+    Map<Integer, StreamPartitionMsgOffset> partitionGroupIdToStartOffset = new 
HashMap<>();

Review Comment:
   Can u add a comment before this line what this map is supposed to contain?
   
   The logic in this class is getting quite hard to read, can we even base 
class some methods and sub-class the partitionGroup vs partitionId for the two 
different type of streams we suppoer?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -1144,21 +1156,33 @@ IdealState ensureAllPartitionsConsuming(TableConfig 
tableConfig, PartitionLevelS
           // 3. we should never end up with some replicas ONLINE and some 
OFFLINE.
           if (isAllInstancesInState(instanceStateMap, 
SegmentStateModel.OFFLINE)) {
             LOGGER.info("Repairing segment: {} which is OFFLINE for all 
instances in IdealState", latestSegmentName);
-            StreamPartitionMsgOffset startOffset = 
offsetFactory.create(latestSegmentZKMetadata.getStartOffset());
+            if (partitionGroupIdToSmallestStreamOffset == null) {
+              partitionGroupIdToSmallestStreamOffset = 
fetchPartitionGroupIdToSmallestOffset(streamConfig);
+            }
+            StreamPartitionMsgOffset startOffset =
+                selectStartOffset(offsetCriteria, partitionGroupId, 
partitionGroupIdToStartOffset,
+                    partitionGroupIdToSmallestStreamOffset, 
tableConfig.getTableName(), offsetFactory,
+                    latestSegmentZKMetadata.getStartOffset()); // segments are 
OFFLINE; start from beginning
             createNewConsumingSegment(tableConfig, streamConfig, 
latestSegmentZKMetadata, currentTimeMs,
-                partitionGroupId, newPartitionGroupMetadataList, 
instancePartitions, instanceStatesMap,
-                segmentAssignment, instancePartitionsMap, startOffset);
+                newPartitionGroupMetadataList, instancePartitions, 
instanceStatesMap, segmentAssignment,
+                instancePartitionsMap, startOffset);
           } else {
             if (newPartitionGroupSet.contains(partitionGroupId)) {
               if (recreateDeletedConsumingSegment && 
latestSegmentZKMetadata.getStatus().isCompleted()
                   && isAllInstancesInState(instanceStateMap, 
SegmentStateModel.ONLINE)) {
                 // If we get here, that means in IdealState, the latest 
segment has all replicas ONLINE.
                 // Create a new IN_PROGRESS segment in PROPERTYSTORE,
                 // add it as CONSUMING segment to IDEALSTATE.
-                StreamPartitionMsgOffset startOffset = 
offsetFactory.create(latestSegmentZKMetadata.getEndOffset());
+                if (partitionGroupIdToSmallestStreamOffset == null) {

Review Comment:
   It may make things more readable if we can get the smallest offset all the 
time? Does it involve multiple calls to the stream, and is that what we are 
optimizing here? If so, good to add a comment. Otherwise, getting it once 
unconditionally make make things a bit more readable.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -1254,21 +1261,39 @@ private void createNewConsumingSegment(TableConfig 
tableConfig, PartitionLevelSt
         instancePartitionsMap);
   }
 
-  @Nullable
-  private StreamPartitionMsgOffset 
getPartitionGroupSmallestOffset(StreamConfig streamConfig, int 
partitionGroupId) {
+  private Map<Integer, StreamPartitionMsgOffset> 
fetchPartitionGroupIdToSmallestOffset(StreamConfig streamConfig) {
     OffsetCriteria originalOffsetCriteria = streamConfig.getOffsetCriteria();
     streamConfig.setOffsetCriteria(OffsetCriteria.SMALLEST_OFFSET_CRITERIA);
-    List<PartitionGroupMetadata> smallestOffsetCriteriaPartitionGroupMetadata =
+    List<PartitionGroupMetadata> partitionGroupMetadataList =
         getNewPartitionGroupMetadataList(streamConfig, 
Collections.emptyList());
     streamConfig.setOffsetCriteria(originalOffsetCriteria);
-    StreamPartitionMsgOffset partitionStartOffset = null;
-    for (PartitionGroupMetadata info : 
smallestOffsetCriteriaPartitionGroupMetadata) {
-      if (info.getPartitionGroupId() == partitionGroupId) {
-        partitionStartOffset = info.getStartOffset();
-        break;
+    Map<Integer, StreamPartitionMsgOffset> partitionGroupIdToSmallestOffset = 
new HashMap<>();
+    for (PartitionGroupMetadata metadata : partitionGroupMetadataList) {
+      partitionGroupIdToSmallestOffset.put(metadata.getPartitionGroupId(), 
metadata.getStartOffset());
+    }
+    return partitionGroupIdToSmallestOffset;
+  }
+
+  private StreamPartitionMsgOffset selectStartOffset(OffsetCriteria 
offsetCriteria, int partitionGroupId,

Review Comment:
   Can you consider removing the `offsetCriteria` from the argument here, and 
incorporating the logic to deal with a non-null value of  `offsetCriteria` 
outside this method? Not sure if it will make the logic more readable, but 
worth a try, I think



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to