smjn commented on code in PR #19431:
URL: https://github.com/apache/kafka/pull/19431#discussion_r2036832379
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -1248,29 +1248,118 @@ private CompletableFuture<Map<String, Errors>>
persisterDeleteToGroupIdErrorMap(
});
}
- private void populateDeleteShareGroupOffsetsFuture(
+ private CompletableFuture<DeleteShareGroupOffsetsResponseData>
checkInitializedSharePartitionsAndProcess(
+ String groupId,
+ DeleteShareGroupOffsetsRequestData requestData
+ ) {
+ Map<Uuid, String> requestTopicIdToNameMapping = new HashMap<>();
+ List<DeleteShareGroupStateRequestData.DeleteStateData>
deleteShareGroupStateRequestTopicsData = new ArrayList<>();
+
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic>
deleteShareGroupOffsetsResponseTopicList =
+ new ArrayList<>(requestData.topics().size());
+
+ return runtime.scheduleReadOperation(
+ "share-group-initialized-partitions",
+ topicPartitionFor(groupId),
+ (coordinator, offset) ->
coordinator.initializedShareGroupPartitions(groupId)
+ ).thenCompose(topicPartitionMap -> {
+ requestData.topics().forEach(topic -> {
+ Uuid topicId =
metadataImage.topics().topicNameToIdView().get(topic.topicName());
+ if (topicId != null) {
+ // A deleteState request to persister should only be sent
with those topic partitions for which corresponding
+ // share partitions are initialized for the group.
+ if (topicPartitionMap.containsKey(topicId)) {
+ requestTopicIdToNameMapping.put(topicId,
topic.topicName());
+ List<DeleteShareGroupStateRequestData.PartitionData>
partitions = new ArrayList<>();
+ topic.partitions().forEach(partition -> {
+ if
(topicPartitionMap.get(topicId).contains(partition)) {
+ partitions.add(new
DeleteShareGroupStateRequestData.PartitionData().setPartition(partition));
+ }
+ });
+ deleteShareGroupStateRequestTopicsData.add(new
DeleteShareGroupStateRequestData.DeleteStateData()
+ .setTopicId(topicId)
+ .setPartitions(partitions));
+ }
+ } else {
+ deleteShareGroupOffsetsResponseTopicList.add(new
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
+ .setTopicName(topic.topicName())
+ .setPartitions(topic.partitions().stream().map(
+ partition -> new
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition()
+ .setPartitionIndex(partition)
+
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
+
.setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message())
+ ).toList()));
+ }
+ });
+
+ // If the request for the persister is empty, just complete the
operation right away.
+ if (deleteShareGroupStateRequestTopicsData.isEmpty()) {
+ return CompletableFuture.completedFuture(new
DeleteShareGroupOffsetsResponseData().setResponses(deleteShareGroupOffsetsResponseTopicList));
+ }
+
+ return sendPersisterDeleteStateRequest(
+ requestData,
+ requestTopicIdToNameMapping,
+ deleteShareGroupStateRequestTopicsData,
+ deleteShareGroupOffsetsResponseTopicList
+ );
+ }).exceptionally(throwable -> {
+ log.error("Failed to get initialized topic partitions for the
group {}", groupId, throwable);
+ return
DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.forException(throwable));
+ });
+ }
+
+ private CompletableFuture<DeleteShareGroupOffsetsResponseData>
checkIfGroupIsEmptyAndProcess(
+ String groupId,
+ DeleteShareGroupOffsetsRequestData requestData
+ ) {
+ // This is done to make sure the provided group is empty. Offsets can
be deleted only for an empty share group.
Review Comment:
This seems unnecessary.
There is no need to make the `describe-groups` call.
You can directly call the shard method on the request and the shard can
check its internal state to verify that the group is empty and return the
appropriate persister request.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]