npawar commented on a change in pull request #6667:
URL: https://github.com/apache/incubator-pinot/pull/6667#discussion_r595611251



##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -448,14 +494,39 @@ private void commitSegmentMetadataInternal(String 
realtimeTableName,
     // Refresh the Broker routing to reflect the changes in the segment ZK 
metadata
     _helixResourceManager.sendSegmentRefreshMessage(realtimeTableName, 
committingSegmentName, false, true);
 
-    // Step-2
+    // Get current partition groups - this gives current state of latest 
segments for each partition
+    // E.g. [A - DONE], [B - IN_PROGRESS], [C - IN_PROGRESS]
+    PartitionLevelStreamConfig streamConfig = new 
PartitionLevelStreamConfig(tableConfig.getTableName(),
+        IngestionConfigUtils.getStreamConfigMap(tableConfig));
+    List<PartitionGroupMetadata> currentPartitionGroupMetadataList =
+        getCurrentPartitionGroupMetadataList(idealState, streamConfig);
+
+    // Fetches new partition groups, given current partition groups metadata.
+    // Assume stream has partitions A, B, C, all still consuming. Result will 
be A, B, C
+    // Assume A was split into D, E, but messages of A are yet to be consumed, 
result will be A, B, C
+    // Assume A was split into D, E and all messages of A are consumed, result 
will be B, C, D, E.
+    List<PartitionGroupInfo> newPartitionGroupInfoList =
+        getPartitionGroupInfoList(streamConfig, 
currentPartitionGroupMetadataList);
+    Set<Integer> newPartitionGroupSet =
+        
newPartitionGroupInfoList.stream().map(PartitionGroupInfo::getPartitionGroupId).collect(Collectors.toSet());
+    int numPartitions = newPartitionGroupInfoList.size();
+
+    // Only if committingSegment's partitionGroup is present in the 
newPartitionGroupInfoList, we create new segment metadata

Review comment:
       The server is still the only one trying to detect end of partition. We 
are setting stopReason as "endOfPartitionGroup". Only difference is that we are 
relying on the controller's "new partition group info" to decide which segments 
to create.
   The controller is not trying to detect end of partition groups explicitly. 
It is simply returning the new groups that it thinks should be. And we create 
new consuming segments for everything that is in the new partition groups list, 
and is not already present. It prolly looks a lil odd right now, because we 
only create new metadata for the committing segment. But the eventual goal is 
to create new metadata for all newly detected partitions.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to