smjn commented on code in PR #19478:
URL: https://github.com/apache/kafka/pull/19478#discussion_r2046235934
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -8200,50 +8200,119 @@ public Optional<DeleteShareGroupStateParameters>
shareGroupBuildPartitionDeleteR
/**
* Returns a list of delete share group state request topic objects to be
used with the persister.
- * @param groupId - group ID of the share group
- * @param requestData - the request data for DeleteShareGroupOffsets
request
- * @param errorTopicResponseList - the list of topics not found in the
metadata image
+ * @param groupId group ID of the share group
+ * @param requestData the request data for
DeleteShareGroupOffsets request
+ * @param errorTopicResponseList the list of topics not found in the
metadata image
+ * @param records List of coordinator records to append
to
+ *
* @return List of objects representing the share group state delete
request for topics.
*/
public List<DeleteShareGroupStateRequestData.DeleteStateData>
sharePartitionsEligibleForOffsetDeletion(
String groupId,
DeleteShareGroupOffsetsRequestData requestData,
-
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic>
errorTopicResponseList
+
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic>
errorTopicResponseList,
+ List<CoordinatorRecord> records
) {
List<DeleteShareGroupStateRequestData.DeleteStateData>
deleteShareGroupStateRequestTopicsData = new ArrayList<>();
+ Map<Uuid, Set<Integer>> initializedTopics = new HashMap<>();
+
+ ShareGroupStatePartitionMetadataInfo currentMap =
shareGroupPartitionMetadata.get(groupId);
+
+ if (currentMap == null) {
+ return deleteShareGroupStateRequestTopicsData;
+ }
+
+ currentMap.initializedTopics().forEach((topicId, partitions) ->
initializedTopics.put(topicId, new HashSet<>(partitions)));
+ Set<Uuid> deletingTopics = new HashSet<>(currentMap.deletingTopics());
- Map<Uuid, Set<Integer>> initializedSharePartitions =
initializedShareGroupPartitions(groupId);
requestData.topics().forEach(topic -> {
Uuid topicId =
metadataImage.topics().topicNameToIdView().get(topic.topicName());
if (topicId != null) {
// A deleteState request to persister should only be sent with
those topic partitions for which corresponding
// share partitions are initialized for the group.
- if (initializedSharePartitions.containsKey(topicId)) {
+ if (currentMap.initializedTopics().containsKey(topicId)) {
List<DeleteShareGroupStateRequestData.PartitionData>
partitions = new ArrayList<>();
- topic.partitions().forEach(partition -> {
- if
(initializedSharePartitions.get(topicId).contains(partition)) {
- partitions.add(new
DeleteShareGroupStateRequestData.PartitionData().setPartition(partition));
- }
- });
- deleteShareGroupStateRequestTopicsData.add(new
DeleteShareGroupStateRequestData.DeleteStateData()
- .setTopicId(topicId)
- .setPartitions(partitions));
+
currentMap.initializedTopics().get(topicId).forEach(partition ->
+ partitions.add(new
DeleteShareGroupStateRequestData.PartitionData().setPartition(partition)));
+ deleteShareGroupStateRequestTopicsData.add(
+ new DeleteShareGroupStateRequestData.DeleteStateData()
+ .setTopicId(topicId)
+ .setPartitions(partitions)
+ );
+ // Removing the topic from initializedTopics map.
+ initializedTopics.remove(topicId);
+ // Adding the topic to deletingTopics map.
+ deletingTopics.add(topicId);
+ } else if (currentMap.deletingTopics().contains(topicId)) {
+ // If the topic for which delete share group offsets
request is sent is already present in the deletingTopics set,
+ // we will include that topic in the delete share group
state request.
+ List<DeleteShareGroupStateRequestData.PartitionData>
partitions = new ArrayList<>();
+
metadataImage.topics().getTopic(topicId).partitions().keySet().forEach(partition
->
+ partitions.add(new
DeleteShareGroupStateRequestData.PartitionData().setPartition(partition)));
+ deleteShareGroupStateRequestTopicsData.add(
+ new DeleteShareGroupStateRequestData.DeleteStateData()
+ .setTopicId(topicId)
+ .setPartitions(partitions)
+ );
}
} else {
errorTopicResponseList.add(new
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
.setTopicName(topic.topicName())
- .setPartitions(topic.partitions().stream().map(
- partition -> new
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition()
- .setPartitionIndex(partition)
-
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
-
.setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message())
- ).collect(Collectors.toCollection(ArrayList::new))));
+ .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
+
.setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message())
+ );
}
});
+ records.add(
+
GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataRecord(
+ groupId,
+ attachTopicName(currentMap.initializingTopics()),
+ attachTopicName(initializedTopics),
+ attachTopicName(deletingTopics)
+ )
+ );
+
return deleteShareGroupStateRequestTopicsData;
}
+ /**
+ * Returns a list of {@link
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic}
corresponding to the
+ * topics for which persister delete share group state request was
successful
+ * @param groupId group ID of the share group
+ * @param topics a map of topicId to topic name
+ * @param records List of coordinator records to append
to
+ *
+ * @return List of objects for which request was successful
+ */
+ public
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic>
completeDeleteShareGroupOffsets(
+ String groupId,
+ Map<Uuid, String> topics,
+ List<CoordinatorRecord> records
+ ) {
+ ShareGroupStatePartitionMetadataInfo currentMap =
shareGroupPartitionMetadata.get(groupId);
Review Comment:
containsKey check needed?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]