AndrewJSchofield commented on code in PR #19431:
URL: https://github.com/apache/kafka/pull/19431#discussion_r2039571274
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -1580,83 +1574,51 @@ public
CompletableFuture<DeleteShareGroupOffsetsResponseData> deleteShareGroupOf
DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.INVALID_GROUP_ID));
}
- Map<Uuid, String> requestTopicIdToNameMapping = new HashMap<>();
- List<DeleteShareGroupStateRequestData.DeleteStateData>
deleteShareGroupStateRequestTopicsData = new
ArrayList<>(requestData.topics().size());
-
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic>
deleteShareGroupOffsetsResponseTopicList = new
ArrayList<>(requestData.topics().size());
-
- requestData.topics().forEach(topic -> {
- Uuid topicId =
metadataImage.topics().topicNameToIdView().get(topic.topicName());
- if (topicId != null) {
- requestTopicIdToNameMapping.put(topicId, topic.topicName());
- deleteShareGroupStateRequestTopicsData.add(new
DeleteShareGroupStateRequestData.DeleteStateData()
- .setTopicId(topicId)
- .setPartitions(
- topic.partitions().stream().map(
- partitionIndex -> new
DeleteShareGroupStateRequestData.PartitionData().setPartition(partitionIndex)
- ).toList()
- ));
- } else {
- deleteShareGroupOffsetsResponseTopicList.add(new
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
- .setTopicName(topic.topicName())
- .setPartitions(topic.partitions().stream().map(
- partition -> new
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition()
- .setPartitionIndex(partition)
-
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
-
.setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message())
- ).toList()));
- }
- });
-
- // If the request for the persister is empty, just complete the
operation right away.
- if (deleteShareGroupStateRequestTopicsData.isEmpty()) {
+ if (requestData.topics() == null || requestData.topics().isEmpty()) {
return CompletableFuture.completedFuture(
new DeleteShareGroupOffsetsResponseData()
- .setResponses(deleteShareGroupOffsetsResponseTopicList));
+ );
}
- CompletableFuture<DeleteShareGroupOffsetsResponseData> future = new
CompletableFuture<>();
+ return runtime.scheduleReadOperation(
+ "share-group-delete-offsets-request",
+ topicPartitionFor(groupId),
+ (coordinator, lastCommittedOffset) ->
coordinator.shareGroupDeleteOffsetsRequest(groupId, requestData)
+ )
+ .thenCompose(resultHolder -> {
+ if (resultHolder == null) {
+ log.error("Failed to retrieve deleteState request
parameters from group coordinator for the group {}", groupId);
+ return CompletableFuture.completedFuture(
+
DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.UNKNOWN_SERVER_ERROR)
+ );
+ }
+
+ if (resultHolder.topLevelError().code() != Errors.NONE.code())
{
+ log.error("Failed to retrieve deleteState request
parameters from group coordinator for the group {}", groupId);
Review Comment:
I don't think you should log an error here. This is the usual error path for
situations such as a non-existent group.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java:
##########
@@ -290,6 +294,61 @@ public GroupCoordinatorShard build() {
}
}
+ public static class DeleteShareGroupOffsetsResultHolder {
+ private final Errors topLevelError;
Review Comment:
I think you should separate Errors into `short topLevelErrorCode` and
`String topLevelErrorMessage`. Otherwise, you'll be missing information about
why an error occurs (not a share group, or non-existent group).
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java:
##########
@@ -612,6 +671,56 @@ public CoordinatorResult<Map<String,
Map.Entry<DeleteShareGroupStateParameters,
return new CoordinatorResult<>(List.of(), responseMap);
}
+ /**
+ * Does the following checks to make sure that a DeleteShareGroupOffsets
request is valid and can be processed further
+ * 1. Checks whether the provided group is empty
+ * 2. Checks the requested topics are presented in the metadataImage
+ * 3. Checks the requested share partitions are initialized for the group
+ *
+ * @param groupId - The group ID
+ * @param requestData - The request data for DeleteShareGroupOffsetsRequest
+ * @return {@link DeleteShareGroupOffsetsResultHolder} an object
containing top level error code, list of topic responses
+ * and persister deleteState
request parameters
+ */
+ public DeleteShareGroupOffsetsResultHolder shareGroupDeleteOffsetsRequest(
+ String groupId,
+ DeleteShareGroupOffsetsRequestData requestData
+ ) {
+ try {
+ ShareGroup group = groupMetadataManager.shareGroup(groupId);
+ group.validateDeleteGroup();
+
+
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic>
errorTopicResponseList = new ArrayList<>();
+ List<DeleteShareGroupStateRequestData.DeleteStateData>
deleteShareGroupStateRequestTopicsData =
+ groupMetadataManager.sharePartitionsEligibleForOffsetDeletion(
+ groupId,
+ requestData,
+ errorTopicResponseList
+ );
+
+ if (deleteShareGroupStateRequestTopicsData.isEmpty()) {
+ return new DeleteShareGroupOffsetsResultHolder(Errors.NONE,
errorTopicResponseList);
+ }
+
+ DeleteShareGroupStateRequestData deleteShareGroupStateRequestData
= new DeleteShareGroupStateRequestData()
+ .setGroupId(requestData.groupId())
+ .setTopics(deleteShareGroupStateRequestTopicsData);
+
+ return new DeleteShareGroupOffsetsResultHolder(
+ Errors.NONE,
+ errorTopicResponseList,
+
DeleteShareGroupStateParameters.from(deleteShareGroupStateRequestData)
+ );
+
+ } catch (GroupIdNotFoundException exception) {
+ log.error("groupId {} not found", groupId, exception);
+ return new
DeleteShareGroupOffsetsResultHolder(Errors.forException(exception));
Review Comment:
So, `new
DeleteShareGroupOffsetsResultHolder(Errors.GROUP_ID_NOT_FOUND.code(),
exception.getMessage());` is better.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -8178,6 +8181,51 @@ public Optional<DeleteShareGroupStateParameters>
shareGroupBuildPartitionDeleteR
);
}
+ /**
+ * Returns a list of delete share group state request topic objects to be
used with the persister.
+ * @param groupId - group ID of the share group
+ * @param requestData - the request data for DeleteShareGroupOffsets
request
+ * @return List of objects representing the share group state delete
request for topics.
Review Comment:
nit: Missing `errorTopicResponseList` from the parameters list.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -1580,83 +1574,51 @@ public
CompletableFuture<DeleteShareGroupOffsetsResponseData> deleteShareGroupOf
DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.INVALID_GROUP_ID));
}
- Map<Uuid, String> requestTopicIdToNameMapping = new HashMap<>();
- List<DeleteShareGroupStateRequestData.DeleteStateData>
deleteShareGroupStateRequestTopicsData = new
ArrayList<>(requestData.topics().size());
-
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic>
deleteShareGroupOffsetsResponseTopicList = new
ArrayList<>(requestData.topics().size());
-
- requestData.topics().forEach(topic -> {
- Uuid topicId =
metadataImage.topics().topicNameToIdView().get(topic.topicName());
- if (topicId != null) {
- requestTopicIdToNameMapping.put(topicId, topic.topicName());
- deleteShareGroupStateRequestTopicsData.add(new
DeleteShareGroupStateRequestData.DeleteStateData()
- .setTopicId(topicId)
- .setPartitions(
- topic.partitions().stream().map(
- partitionIndex -> new
DeleteShareGroupStateRequestData.PartitionData().setPartition(partitionIndex)
- ).toList()
- ));
- } else {
- deleteShareGroupOffsetsResponseTopicList.add(new
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
- .setTopicName(topic.topicName())
- .setPartitions(topic.partitions().stream().map(
- partition -> new
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition()
- .setPartitionIndex(partition)
-
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
-
.setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message())
- ).toList()));
- }
- });
-
- // If the request for the persister is empty, just complete the
operation right away.
- if (deleteShareGroupStateRequestTopicsData.isEmpty()) {
+ if (requestData.topics() == null || requestData.topics().isEmpty()) {
return CompletableFuture.completedFuture(
new DeleteShareGroupOffsetsResponseData()
- .setResponses(deleteShareGroupOffsetsResponseTopicList));
+ );
}
- CompletableFuture<DeleteShareGroupOffsetsResponseData> future = new
CompletableFuture<>();
+ return runtime.scheduleReadOperation(
+ "share-group-delete-offsets-request",
+ topicPartitionFor(groupId),
+ (coordinator, lastCommittedOffset) ->
coordinator.shareGroupDeleteOffsetsRequest(groupId, requestData)
+ )
+ .thenCompose(resultHolder -> {
+ if (resultHolder == null) {
+ log.error("Failed to retrieve deleteState request
parameters from group coordinator for the group {}", groupId);
+ return CompletableFuture.completedFuture(
+
DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.UNKNOWN_SERVER_ERROR)
+ );
+ }
+
+ if (resultHolder.topLevelError().code() != Errors.NONE.code())
{
+ log.error("Failed to retrieve deleteState request
parameters from group coordinator for the group {}", groupId);
+ return CompletableFuture.completedFuture(
+
DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(resultHolder.topLevelError())
Review Comment:
And here's a case where you want to maintain the error message so it's
available to the admin client.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java:
##########
@@ -612,6 +671,56 @@ public CoordinatorResult<Map<String,
Map.Entry<DeleteShareGroupStateParameters,
return new CoordinatorResult<>(List.of(), responseMap);
}
+ /**
+ * Does the following checks to make sure that a DeleteShareGroupOffsets
request is valid and can be processed further
+ * 1. Checks whether the provided group is empty
+ * 2. Checks the requested topics are presented in the metadataImage
+ * 3. Checks the requested share partitions are initialized for the group
+ *
+ * @param groupId - The group ID
+ * @param requestData - The request data for DeleteShareGroupOffsetsRequest
+ * @return {@link DeleteShareGroupOffsetsResultHolder} an object
containing top level error code, list of topic responses
+ * and persister deleteState
request parameters
+ */
+ public DeleteShareGroupOffsetsResultHolder shareGroupDeleteOffsetsRequest(
+ String groupId,
+ DeleteShareGroupOffsetsRequestData requestData
+ ) {
+ try {
+ ShareGroup group = groupMetadataManager.shareGroup(groupId);
+ group.validateDeleteGroup();
+
+
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic>
errorTopicResponseList = new ArrayList<>();
+ List<DeleteShareGroupStateRequestData.DeleteStateData>
deleteShareGroupStateRequestTopicsData =
+ groupMetadataManager.sharePartitionsEligibleForOffsetDeletion(
+ groupId,
+ requestData,
+ errorTopicResponseList
+ );
+
+ if (deleteShareGroupStateRequestTopicsData.isEmpty()) {
+ return new DeleteShareGroupOffsetsResultHolder(Errors.NONE,
errorTopicResponseList);
+ }
+
+ DeleteShareGroupStateRequestData deleteShareGroupStateRequestData
= new DeleteShareGroupStateRequestData()
+ .setGroupId(requestData.groupId())
+ .setTopics(deleteShareGroupStateRequestTopicsData);
+
+ return new DeleteShareGroupOffsetsResultHolder(
+ Errors.NONE,
+ errorTopicResponseList,
+
DeleteShareGroupStateParameters.from(deleteShareGroupStateRequestData)
+ );
+
+ } catch (GroupIdNotFoundException exception) {
+ log.error("groupId {} not found", groupId, exception);
+ return new
DeleteShareGroupOffsetsResultHolder(Errors.forException(exception));
+ } catch (GroupNotEmptyException exception) {
+ log.error("Provided group {} is not empty", groupId);
+ return new
DeleteShareGroupOffsetsResultHolder(Errors.forException(exception));
Review Comment:
`new DeleteShareGroupOffsetsResultHolder(Errors.NON_EMPTY_GROUP.code(),
exception.getMessage());`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]