npawar commented on a change in pull request #7058: URL: https://github.com/apache/pinot/pull/7058#discussion_r788084691
########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java ########## @@ -588,17 +588,46 @@ private void commitSegmentMetadataInternal(String realtimeTableName, lock.unlock(); } - // TODO: also create the new partition groups here, instead of waiting till the {@link - // RealtimeSegmentValidationManager} runs + // Creates new partition groups here instead of waiting till the {@link RealtimeSegmentValidationManager} runs // E.g. If current state is A, B, C, and newPartitionGroupMetadataList contains B, C, D, E, - // then create metadata/idealstate entries for D, E along with the committing partition's entries. - // Ensure that multiple committing segments don't create multiple new segment metadata and ideal state entries - // for the same partitionGroup + // then metadata/idealstate entries for D, E are created along with the committing partition's entries. + + addNewPartitionGroups(realtimeTableName, tableConfig, instancePartitions, idealState, numReplicas, streamConfig, + newPartitionGroupMetadataList, numPartitionGroups, segmentAssignment, instancePartitionsMap); // Trigger the metadata event notifier _metadataEventNotifierFactory.create().notifyOnSegmentFlush(tableConfig); } + /** + * Method is kept synchronised so that multiple committing segments don't create multiple new segment metadata + * and ideal state entries for the same partitionGroup + */ + private synchronized void addNewPartitionGroups(String realtimeTableName, TableConfig tableConfig, + InstancePartitions instancePartitions, IdealState idealState, int numReplicas, + PartitionLevelStreamConfig streamConfig, List<PartitionGroupMetadata> newPartitionGroupMetadataList, + int numPartitionGroups, SegmentAssignment segmentAssignment, + Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap) { + Map<Integer, SegmentZKMetadata> latestSegmentZKMetadataMap = + getLatestSegmentZKMetadataMap(realtimeTableName); + + Map<String, Map<String, String>> instanceStatesMap = idealState.getRecord().getMapFields(); + for (PartitionGroupMetadata partitionGroupMetadata : newPartitionGroupMetadataList) { + int partitionGroupId = partitionGroupMetadata.getPartitionGroupId(); + if (!latestSegmentZKMetadataMap.containsKey(partitionGroupId)) { + String newSegmentName = + setupNewPartitionGroup(tableConfig, streamConfig, partitionGroupMetadata, getCurrentTimeMs(), Review comment: i think adding `synchronized` just to `setupNewPartitiongroup` won't help with the race condition. In both the invocations of `setupNewpartitionGroup`, the calculation `Map<Integer, SegmentZKMetadata> latestSegmentZKMetadataMap = getLatestSegmentZKMetadataMap(realtimeTableName);` happens without any synchronization. So either thread can have a stale understanding of the latest partitions that exist in zk, even if `setupNewPartitionGroup` is synchronized. What will help is if `ensureAllPartitionsConsuming` method is also made to call `addNewPartitionGroups` at the end to setup new partition groups. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org