npawar commented on a change in pull request #7058:
URL: https://github.com/apache/pinot/pull/7058#discussion_r788084691
##########
File path:
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -588,17 +588,46 @@ private void commitSegmentMetadataInternal(String
realtimeTableName,
lock.unlock();
}
- // TODO: also create the new partition groups here, instead of waiting
till the {@link
- // RealtimeSegmentValidationManager} runs
+ // Creates new partition groups here instead of waiting till the {@link
RealtimeSegmentValidationManager} runs
// E.g. If current state is A, B, C, and newPartitionGroupMetadataList
contains B, C, D, E,
- // then create metadata/idealstate entries for D, E along with the
committing partition's entries.
- // Ensure that multiple committing segments don't create multiple new
segment metadata and ideal state entries
- // for the same partitionGroup
+ // then metadata/idealstate entries for D, E are created along with the
committing partition's entries.
+
+ addNewPartitionGroups(realtimeTableName, tableConfig, instancePartitions,
idealState, numReplicas, streamConfig,
+ newPartitionGroupMetadataList, numPartitionGroups, segmentAssignment,
instancePartitionsMap);
// Trigger the metadata event notifier
_metadataEventNotifierFactory.create().notifyOnSegmentFlush(tableConfig);
}
+ /**
+ * Method is kept synchronised so that multiple committing segments don't
create multiple new segment metadata
+ * and ideal state entries for the same partitionGroup
+ */
+ private synchronized void addNewPartitionGroups(String realtimeTableName,
TableConfig tableConfig,
+ InstancePartitions instancePartitions, IdealState idealState, int
numReplicas,
+ PartitionLevelStreamConfig streamConfig, List<PartitionGroupMetadata>
newPartitionGroupMetadataList,
+ int numPartitionGroups, SegmentAssignment segmentAssignment,
+ Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap) {
+ Map<Integer, SegmentZKMetadata> latestSegmentZKMetadataMap =
+ getLatestSegmentZKMetadataMap(realtimeTableName);
+
+ Map<String, Map<String, String>> instanceStatesMap =
idealState.getRecord().getMapFields();
+ for (PartitionGroupMetadata partitionGroupMetadata :
newPartitionGroupMetadataList) {
+ int partitionGroupId = partitionGroupMetadata.getPartitionGroupId();
+ if (!latestSegmentZKMetadataMap.containsKey(partitionGroupId)) {
+ String newSegmentName =
+ setupNewPartitionGroup(tableConfig, streamConfig,
partitionGroupMetadata, getCurrentTimeMs(),
Review comment:
i think adding `synchronized` just to `setupNewPartitiongroup` won't
help with the race condition.
In both the invocations of `setupNewpartitionGroup`, the calculation
`Map<Integer, SegmentZKMetadata> latestSegmentZKMetadataMap =
getLatestSegmentZKMetadataMap(realtimeTableName);` happens without any
synchronization. So either thread can have a stale understanding of the latest
partitions that exist in zk, even if `setupNewPartitionGroup` is synchronized.
What will help is if `ensureAllPartitionsConsuming` method is also made to
call `addNewPartitionGroups` at the end to setup new partition groups.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]