npawar commented on a change in pull request #6667: URL: https://github.com/apache/incubator-pinot/pull/6667#discussion_r595611251
########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java ########## @@ -448,14 +494,39 @@ private void commitSegmentMetadataInternal(String realtimeTableName, // Refresh the Broker routing to reflect the changes in the segment ZK metadata _helixResourceManager.sendSegmentRefreshMessage(realtimeTableName, committingSegmentName, false, true); - // Step-2 + // Get current partition groups - this gives current state of latest segments for each partition + // E.g. [A - DONE], [B - IN_PROGRESS], [C - IN_PROGRESS] + PartitionLevelStreamConfig streamConfig = new PartitionLevelStreamConfig(tableConfig.getTableName(), + IngestionConfigUtils.getStreamConfigMap(tableConfig)); + List<PartitionGroupMetadata> currentPartitionGroupMetadataList = + getCurrentPartitionGroupMetadataList(idealState, streamConfig); + + // Fetches new partition groups, given current partition groups metadata. + // Assume stream has partitions A, B, C, all still consuming. Result will be A, B, C + // Assume A was split into D, E, but messages of A are yet to be consumed, result will be A, B, C + // Assume A was split into D, E and all messages of A are consumed, result will be B, C, D, E. + List<PartitionGroupInfo> newPartitionGroupInfoList = + getPartitionGroupInfoList(streamConfig, currentPartitionGroupMetadataList); + Set<Integer> newPartitionGroupSet = + newPartitionGroupInfoList.stream().map(PartitionGroupInfo::getPartitionGroupId).collect(Collectors.toSet()); + int numPartitions = newPartitionGroupInfoList.size(); + + // Only if committingSegment's partitionGroup is present in the newPartitionGroupInfoList, we create new segment metadata Review comment: The server is still the only one trying to detect end of partition. We are setting stopReason as "endOfPartitionGroup". Only difference is that we are relying on the controller's "new partition group info" to decide which segments to create. The controller is not trying to detect end of partition groups explicitly. It is simply returning the new groups that it thinks should be. And we create new consuming segments for everything that is in the new partition groups list, and is not already present. It prolly looks a lil odd right now, because we only create new metadata for the committing segment. But the eventual goal is to create new metadata for all newly detected partitions. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org