npawar commented on a change in pull request #7058:
URL: https://github.com/apache/pinot/pull/7058#discussion_r785662017



##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -588,17 +588,46 @@ private void commitSegmentMetadataInternal(String 
realtimeTableName,
       lock.unlock();
     }
 
-    // TODO: also create the new partition groups here, instead of waiting 
till the {@link
-    //  RealtimeSegmentValidationManager} runs
+    //  Creates new partition groups here instead of waiting till the {@link 
RealtimeSegmentValidationManager} runs
     //  E.g. If current state is A, B, C, and newPartitionGroupMetadataList 
contains B, C, D, E,
-    //  then create metadata/idealstate entries for D, E along with the 
committing partition's entries.
-    //  Ensure that multiple committing segments don't create multiple new 
segment metadata and ideal state entries
-    //  for the same partitionGroup
+    //  then metadata/idealstate entries for D, E are created along with the 
committing partition's entries.
+
+    addNewPartitionGroups(realtimeTableName, tableConfig, instancePartitions, 
idealState, numReplicas, streamConfig,
+        newPartitionGroupMetadataList, numPartitionGroups, segmentAssignment, 
instancePartitionsMap);
 
     // Trigger the metadata event notifier
     _metadataEventNotifierFactory.create().notifyOnSegmentFlush(tableConfig);
   }
 
+  /**
+   * Method is kept synchronised so that multiple committing segments don't 
create multiple new segment metadata
+   * and ideal state entries for the same partitionGroup
+   */
+  private synchronized void addNewPartitionGroups(String realtimeTableName, 
TableConfig tableConfig,
+      InstancePartitions instancePartitions, IdealState idealState, int 
numReplicas,
+      PartitionLevelStreamConfig streamConfig, List<PartitionGroupMetadata> 
newPartitionGroupMetadataList,
+      int numPartitionGroups, SegmentAssignment segmentAssignment,
+      Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap) {
+    Map<Integer, SegmentZKMetadata> latestSegmentZKMetadataMap =
+        getLatestSegmentZKMetadataMap(realtimeTableName);
+
+    Map<String, Map<String, String>> instanceStatesMap = 
idealState.getRecord().getMapFields();
+    for (PartitionGroupMetadata partitionGroupMetadata : 
newPartitionGroupMetadataList) {
+      int partitionGroupId = partitionGroupMetadata.getPartitionGroupId();
+      if (!latestSegmentZKMetadataMap.containsKey(partitionGroupId)) {
+        String newSegmentName =
+            setupNewPartitionGroup(tableConfig, streamConfig, 
partitionGroupMetadata, getCurrentTimeMs(),

Review comment:
       there can be a race condition between this and the 
RealtimeSegmentValidationManager? Check the other invocation of this method 
`setupNewPartitionGroup`, which happens in the controller periodic background 
thread.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to