AndrewJSchofield commented on code in PR #19493:
URL: https://github.com/apache/kafka/pull/19493#discussion_r2079364838
##########
clients/src/main/resources/common/message/ListConfigResourcesResponse.json:
##########
@@ -16,18 +16,22 @@
{
"apiKey": 74,
"type": "response",
- "name": "ListClientMetricsResourcesResponse",
- "validVersions": "0",
+ "name": "ListConfigResourcesResponse",
+ // Version 0 is used as ListClientMetricsResourcesResponse which returns all
client metrics resources.
+ // Version 1 adds ResourceType to ConfigResources (KIP-1142).
+ "validVersions": "0-1",
"flexibleVersions": "0+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
"about": "The duration in milliseconds for which the request was
throttled due to a quota violation, or zero if the request did not violate any
quota." },
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The error code, or 0 if there was no error." },
- { "name": "ClientMetricsResources", "type": "[]ClientMetricsResource",
"versions": "0+",
- "about": "Each client metrics resource in the response.", "fields": [
- { "name": "Name", "type": "string", "versions": "0+",
- "about": "The resource name." }
+ { "name": "ConfigResources", "type": "[]ConfigResource", "versions":
"0+",
+ "about": "Each config resource in the response.", "fields": [
+ { "name": "ResourceName", "type": "string", "versions": "0+",
+ "about": "The resource name." },
+ { "name": "ResourceType", "type": "int8", "versions": "1+",
"ignorable": true,
Review Comment:
Should this be `"default": "16"`? I'm still researching this, so that's an
open question.
##########
clients/src/main/resources/common/message/ListConfigResourcesRequest.json:
##########
@@ -17,10 +17,14 @@
"apiKey": 74,
"type": "request",
"listeners": ["broker"],
- "name": "ListClientMetricsResourcesRequest",
- "validVersions": "0",
+ "name": "ListConfigResourcesRequest",
+ // Version 0 is used as ListClientMetricsResourcesRequest which only lists
client metrics resources.
+ // Version 1 adds ResourceTypes field (KIP-1142). If there is no specified
ResourceTypes, it should return all configuration resources.
+ "validVersions": "0-1",
"flexibleVersions": "0+",
"fields": [
+ { "name": "ResourceTypes", "type": "[]int8", "versions": "1+",
"ignorable": true,
Review Comment:
I don't see the value of `ignorable` here. If it's v0, the default is
essentially an array containing `ConfigResource.CLIENT_METRICS`.
##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -2874,16 +2874,61 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}
- def handleListClientMetricsResources(request: RequestChannel.Request): Unit
= {
- val listClientMetricsResourcesRequest =
request.body[ListClientMetricsResourcesRequest]
+ private def handleListConfigResources(request: RequestChannel.Request): Unit
= {
+ val listConfigResourcesRequest = request.body[ListConfigResourcesRequest]
if (!authHelper.authorize(request.context, DESCRIBE_CONFIGS, CLUSTER,
CLUSTER_NAME)) {
- requestHelper.sendMaybeThrottle(request,
listClientMetricsResourcesRequest.getErrorResponse(Errors.CLUSTER_AUTHORIZATION_FAILED.exception))
+ requestHelper.sendMaybeThrottle(request,
listConfigResourcesRequest.getErrorResponse(Errors.CLUSTER_AUTHORIZATION_FAILED.exception))
} else {
- val data = new
ListClientMetricsResourcesResponseData().setClientMetricsResources(
- clientMetricsManager.listClientMetricsResources.stream.map(
- name => new
ClientMetricsResource().setName(name)).collect(Collectors.toList()))
- requestHelper.sendMaybeThrottle(request, new
ListClientMetricsResourcesResponse(data))
+ val data = new ListConfigResourcesResponseData()
+
+ if (request.header.apiVersion() == 0) {
+ // Version 0 only supports client metrics.
+
data.setConfigResources(clientMetricsManager.listClientMetricsResources.stream.map(name
=>
+ new
ListConfigResourcesResponseData.ConfigResource().setResourceName(name).setResourceType(ConfigResource.Type.CLIENT_METRICS.id)
+ ).collect(Collectors.toList()))
+ } else {
+ // From version 1, supports all ConfigResource.Type.
+ var resourceTypes = listConfigResourcesRequest.data().resourceTypes()
+ if (resourceTypes.isEmpty) {
+ resourceTypes = util.List.of(
Review Comment:
Could we define a constant in `ConfigResource` that includes all of the
types so that someone adding a type will not risk missing changing this code?
##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -2874,16 +2874,61 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}
- def handleListClientMetricsResources(request: RequestChannel.Request): Unit
= {
- val listClientMetricsResourcesRequest =
request.body[ListClientMetricsResourcesRequest]
+ private def handleListConfigResources(request: RequestChannel.Request): Unit
= {
+ val listConfigResourcesRequest = request.body[ListConfigResourcesRequest]
if (!authHelper.authorize(request.context, DESCRIBE_CONFIGS, CLUSTER,
CLUSTER_NAME)) {
- requestHelper.sendMaybeThrottle(request,
listClientMetricsResourcesRequest.getErrorResponse(Errors.CLUSTER_AUTHORIZATION_FAILED.exception))
+ requestHelper.sendMaybeThrottle(request,
listConfigResourcesRequest.getErrorResponse(Errors.CLUSTER_AUTHORIZATION_FAILED.exception))
} else {
- val data = new
ListClientMetricsResourcesResponseData().setClientMetricsResources(
- clientMetricsManager.listClientMetricsResources.stream.map(
- name => new
ClientMetricsResource().setName(name)).collect(Collectors.toList()))
- requestHelper.sendMaybeThrottle(request, new
ListClientMetricsResourcesResponse(data))
+ val data = new ListConfigResourcesResponseData()
+
+ if (request.header.apiVersion() == 0) {
Review Comment:
I wonder whether this version-specific behaviour would be better put into
`ListConfigResourcesRequest.java` and `ListConfigResourcesResponse.java` as
appropriate.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]