smjn commented on code in PR #19431:
URL: https://github.com/apache/kafka/pull/19431#discussion_r2036833556
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -1248,29 +1248,118 @@ private CompletableFuture<Map<String, Errors>>
persisterDeleteToGroupIdErrorMap(
});
}
- private void populateDeleteShareGroupOffsetsFuture(
+ private CompletableFuture<DeleteShareGroupOffsetsResponseData>
checkInitializedSharePartitionsAndProcess(
+ String groupId,
+ DeleteShareGroupOffsetsRequestData requestData
+ ) {
+ Map<Uuid, String> requestTopicIdToNameMapping = new HashMap<>();
+ List<DeleteShareGroupStateRequestData.DeleteStateData>
deleteShareGroupStateRequestTopicsData = new ArrayList<>();
+
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic>
deleteShareGroupOffsetsResponseTopicList =
+ new ArrayList<>(requestData.topics().size());
+
+ return runtime.scheduleReadOperation(
+ "share-group-initialized-partitions",
+ topicPartitionFor(groupId),
+ (coordinator, offset) ->
coordinator.initializedShareGroupPartitions(groupId)
+ ).thenCompose(topicPartitionMap -> {
+ requestData.topics().forEach(topic -> {
+ Uuid topicId =
metadataImage.topics().topicNameToIdView().get(topic.topicName());
+ if (topicId != null) {
+ // A deleteState request to persister should only be sent
with those topic partitions for which corresponding
+ // share partitions are initialized for the group.
+ if (topicPartitionMap.containsKey(topicId)) {
+ requestTopicIdToNameMapping.put(topicId,
topic.topicName());
+ List<DeleteShareGroupStateRequestData.PartitionData>
partitions = new ArrayList<>();
+ topic.partitions().forEach(partition -> {
+ if
(topicPartitionMap.get(topicId).contains(partition)) {
+ partitions.add(new
DeleteShareGroupStateRequestData.PartitionData().setPartition(partition));
+ }
+ });
+ deleteShareGroupStateRequestTopicsData.add(new
DeleteShareGroupStateRequestData.DeleteStateData()
+ .setTopicId(topicId)
+ .setPartitions(partitions));
+ }
+ } else {
+ deleteShareGroupOffsetsResponseTopicList.add(new
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
+ .setTopicName(topic.topicName())
+ .setPartitions(topic.partitions().stream().map(
+ partition -> new
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition()
+ .setPartitionIndex(partition)
+
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
+
.setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message())
+ ).toList()));
+ }
+ });
+
+ // If the request for the persister is empty, just complete the
operation right away.
+ if (deleteShareGroupStateRequestTopicsData.isEmpty()) {
+ return CompletableFuture.completedFuture(new
DeleteShareGroupOffsetsResponseData().setResponses(deleteShareGroupOffsetsResponseTopicList));
+ }
+
+ return sendPersisterDeleteStateRequest(
+ requestData,
+ requestTopicIdToNameMapping,
+ deleteShareGroupStateRequestTopicsData,
+ deleteShareGroupOffsetsResponseTopicList
+ );
+ }).exceptionally(throwable -> {
+ log.error("Failed to get initialized topic partitions for the
group {}", groupId, throwable);
+ return
DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.forException(throwable));
+ });
+ }
+
+ private CompletableFuture<DeleteShareGroupOffsetsResponseData>
checkIfGroupIsEmptyAndProcess(
Review Comment:
just `processShareGroupOffsetsDelete`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]