This is an automated email from the ASF dual-hosted git repository.
lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 7997c9ebe0c KAFKA-20620: Add StreamsGroupTopologyDescriptionUpdate RPC
schema and extend StreamsGroupDescribe/Heartbeat (#22397)
7997c9ebe0c is described below
commit 7997c9ebe0c651432b28c7e7a7dc62ce456b33b4
Author: Alieh Saeedi <[email protected]>
AuthorDate: Wed Jun 3 11:05:03 2026 +0200
KAFKA-20620: Add StreamsGroupTopologyDescriptionUpdate RPC schema and
extend StreamsGroupDescribe/Heartbeat (#22397)
First sub-task of
[KIP-1331](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1331%3A+Streams+Group+Topology+Description+Plugin)
(Streams Group Topology Description Plugin). Adds the wire-format
scaffolding only — no broker handler or client logic yet (those land in
later tickets).
- New RPC `StreamsGroupTopologyDescriptionUpdate` (apiKey 93) —
request/response schemas + Java wrappers, wired through `ApiKeys`,
`AbstractRequest`, `AbstractResponse`,
`RequestConvertToJson`.
- New error codes: `GROUP_DELETION_FAILED` (134),
`STREAMS_TOPOLOGY_DESCRIPTION_UPDATE_FAILED` (135), with matching
exception classes.
- `StreamsGroupHeartbeatResponse`: tagged `TopologyDescriptionRequired`.
- `StreamsGroupDescribeRequest`: tagged `IncludeTopologyDescription`.
- `StreamsGroupDescribeResponse`: tagged `TopologyDescription` +
`TopologyDescriptionStatus`.
- `DeleteGroups{Request,Response}` bumped to v3 with per-group
`ErrorMessage`.
Reviewers: Lucas Brutschy <[email protected]>
---------
Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
---
.../errors/GroupDeletionFailedException.java | 31 +++++
...msTopologyDescriptionUpdateFailedException.java | 32 +++++
.../org/apache/kafka/common/protocol/ApiKeys.java | 3 +-
.../org/apache/kafka/common/protocol/Errors.java | 6 +-
.../kafka/common/requests/AbstractRequest.java | 2 +
.../kafka/common/requests/AbstractResponse.java | 2 +
.../requests/StreamsGroupDescribeResponse.java | 6 +
...reamsGroupTopologyDescriptionUpdateRequest.java | 83 +++++++++++
...amsGroupTopologyDescriptionUpdateResponse.java} | 31 ++---
.../common/message/DeleteGroupsRequest.json | 6 +-
.../common/message/DeleteGroupsResponse.json | 9 +-
.../message/StreamsGroupDescribeRequest.json | 11 +-
.../message/StreamsGroupDescribeResponse.json | 45 +++++-
.../message/StreamsGroupHeartbeatRequest.json | 7 +-
.../message/StreamsGroupHeartbeatResponse.json | 6 +-
...reamsGroupTopologyDescriptionUpdateRequest.json | 71 ++++++++++
...amsGroupTopologyDescriptionUpdateResponse.json} | 33 +++--
.../kafka/common/requests/RequestResponseTest.java | 152 +++++++++++++++++++--
core/src/main/scala/kafka/server/KafkaApis.scala | 10 ++
.../scala/unit/kafka/server/RequestQuotaTest.scala | 3 +
.../apache/kafka/network/RequestConvertToJson.java | 8 ++
21 files changed, 501 insertions(+), 56 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/common/errors/GroupDeletionFailedException.java
b/clients/src/main/java/org/apache/kafka/common/errors/GroupDeletionFailedException.java
new file mode 100644
index 00000000000..eaa83160526
--- /dev/null
+++
b/clients/src/main/java/org/apache/kafka/common/errors/GroupDeletionFailedException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+/**
+ * Indicates that {@code DeleteGroups} could not complete for the affected
group. The
+ * accompanying error message describes the underlying cause; the caller may
retry once
+ * the underlying condition is resolved.
+ */
+public class GroupDeletionFailedException extends ApiException {
+
+ private static final long serialVersionUID = 1L;
+
+ public GroupDeletionFailedException(String message) {
+ super(message);
+ }
+}
diff --git
a/clients/src/main/java/org/apache/kafka/common/errors/StreamsTopologyDescriptionUpdateFailedException.java
b/clients/src/main/java/org/apache/kafka/common/errors/StreamsTopologyDescriptionUpdateFailedException.java
new file mode 100644
index 00000000000..df981b8b5b9
--- /dev/null
+++
b/clients/src/main/java/org/apache/kafka/common/errors/StreamsTopologyDescriptionUpdateFailedException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+/**
+ * Indicates that the streams group topology description plugin failed to
process
+ * a StreamsGroupTopologyDescriptionUpdate request. The accompanying error
message
+ * describes the underlying cause; the broker tracks the transient-vs-permanent
+ * distinction internally and does not reflect it on the wire.
+ */
+public class StreamsTopologyDescriptionUpdateFailedException extends
ApiException {
+
+ private static final long serialVersionUID = 1L;
+
+ public StreamsTopologyDescriptionUpdateFailedException(String message) {
+ super(message);
+ }
+}
diff --git
a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index 79b283b4f8d..22dc37e67d1 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -137,7 +137,8 @@ public enum ApiKeys {
STREAMS_GROUP_DESCRIBE(ApiMessageType.STREAMS_GROUP_DESCRIBE),
DESCRIBE_SHARE_GROUP_OFFSETS(ApiMessageType.DESCRIBE_SHARE_GROUP_OFFSETS),
ALTER_SHARE_GROUP_OFFSETS(ApiMessageType.ALTER_SHARE_GROUP_OFFSETS),
- DELETE_SHARE_GROUP_OFFSETS(ApiMessageType.DELETE_SHARE_GROUP_OFFSETS);
+ DELETE_SHARE_GROUP_OFFSETS(ApiMessageType.DELETE_SHARE_GROUP_OFFSETS),
+
STREAMS_GROUP_TOPOLOGY_DESCRIPTION_UPDATE(ApiMessageType.STREAMS_GROUP_TOPOLOGY_DESCRIPTION_UPDATE);
private static final Map<ApiMessageType.ListenerType, EnumSet<ApiKeys>>
APIS_BY_LISTENER =
new EnumMap<>(ApiMessageType.ListenerType.class);
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index a27a7fcf23c..15ac7765a29 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -45,6 +45,7 @@ import
org.apache.kafka.common.errors.FencedStateEpochException;
import org.apache.kafka.common.errors.FetchSessionIdNotFoundException;
import org.apache.kafka.common.errors.FetchSessionTopicIdException;
import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.GroupDeletionFailedException;
import org.apache.kafka.common.errors.GroupIdNotFoundException;
import org.apache.kafka.common.errors.GroupMaxSizeReachedException;
import org.apache.kafka.common.errors.GroupNotEmptyException;
@@ -122,6 +123,7 @@ import
org.apache.kafka.common.errors.StaleBrokerEpochException;
import org.apache.kafka.common.errors.StaleMemberEpochException;
import org.apache.kafka.common.errors.StreamsInvalidTopologyEpochException;
import org.apache.kafka.common.errors.StreamsInvalidTopologyException;
+import
org.apache.kafka.common.errors.StreamsTopologyDescriptionUpdateFailedException;
import org.apache.kafka.common.errors.StreamsTopologyFencedException;
import org.apache.kafka.common.errors.TelemetryTooLargeException;
import org.apache.kafka.common.errors.ThrottlingQuotaExceededException;
@@ -418,7 +420,9 @@ public enum Errors {
STREAMS_INVALID_TOPOLOGY(130, "The supplied topology is invalid.",
StreamsInvalidTopologyException::new),
STREAMS_INVALID_TOPOLOGY_EPOCH(131, "The supplied topology epoch is
invalid.", StreamsInvalidTopologyEpochException::new),
STREAMS_TOPOLOGY_FENCED(132, "The supplied topology epoch is outdated.",
StreamsTopologyFencedException::new),
- SHARE_SESSION_LIMIT_REACHED(133, "The limit of share sessions has been
reached.", ShareSessionLimitReachedException::new);
+ SHARE_SESSION_LIMIT_REACHED(133, "The limit of share sessions has been
reached.", ShareSessionLimitReachedException::new),
+ GROUP_DELETION_FAILED(134, "DeleteGroups could not complete; see the error
message on the per-group result for details.",
GroupDeletionFailedException::new),
+ STREAMS_TOPOLOGY_DESCRIPTION_UPDATE_FAILED(135, "The broker could not
process the topology description update; see the error message for details.",
StreamsTopologyDescriptionUpdateFailedException::new);
private static final Logger log = LoggerFactory.getLogger(Errors.class);
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
index 750de2050f4..8630c379ed6 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
@@ -354,6 +354,8 @@ public abstract class AbstractRequest implements
AbstractRequestResponse {
return AlterShareGroupOffsetsRequest.parse(readable,
apiVersion);
case DELETE_SHARE_GROUP_OFFSETS:
return DeleteShareGroupOffsetsRequest.parse(readable,
apiVersion);
+ case STREAMS_GROUP_TOPOLOGY_DESCRIPTION_UPDATE:
+ return
StreamsGroupTopologyDescriptionUpdateRequest.parse(readable, apiVersion);
default:
throw new AssertionError(String.format("ApiKey %s is not
currently handled in `parseRequest`, the " +
"code should be updated to do so.", apiKey));
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
index 7d96d1a1731..961559d9e8f 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
@@ -290,6 +290,8 @@ public abstract class AbstractResponse implements
AbstractRequestResponse {
return AlterShareGroupOffsetsResponse.parse(readable, version);
case DELETE_SHARE_GROUP_OFFSETS:
return DeleteShareGroupOffsetsResponse.parse(readable,
version);
+ case STREAMS_GROUP_TOPOLOGY_DESCRIPTION_UPDATE:
+ return
StreamsGroupTopologyDescriptionUpdateResponse.parse(readable, version);
default:
throw new AssertionError(String.format("ApiKey %s is not
currently handled in `parseResponse`, the " +
"code should be updated to do so.", apiKey));
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupDescribeResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupDescribeResponse.java
index efee6e521f4..8423b0b69ec 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupDescribeResponse.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupDescribeResponse.java
@@ -38,6 +38,12 @@ import java.util.Map;
*/
public class StreamsGroupDescribeResponse extends AbstractResponse {
+ // TopologyDescriptionStatus int8 values (v1+). These values must not
change.
+ public static final byte TOPOLOGY_DESCRIPTION_STATUS_NOT_REQUESTED = 0;
+ public static final byte TOPOLOGY_DESCRIPTION_STATUS_NOT_STORED = 1;
+ public static final byte TOPOLOGY_DESCRIPTION_STATUS_ERROR = 2;
+ public static final byte TOPOLOGY_DESCRIPTION_STATUS_AVAILABLE = 3;
+
private final StreamsGroupDescribeResponseData data;
public StreamsGroupDescribeResponse(StreamsGroupDescribeResponseData data)
{
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupTopologyDescriptionUpdateRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupTopologyDescriptionUpdateRequest.java
new file mode 100644
index 00000000000..d476f7ac44d
--- /dev/null
+++
b/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupTopologyDescriptionUpdateRequest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import
org.apache.kafka.common.message.StreamsGroupTopologyDescriptionUpdateRequestData;
+import
org.apache.kafka.common.message.StreamsGroupTopologyDescriptionUpdateResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Readable;
+
+/**
+ * Sent by a Streams client to push its topology description to the broker, in
response
+ * to {@code TopologyDescriptionRequired=true} on a {@code
StreamsGroupHeartbeatResponse}.
+ * The broker validates that {@code MemberId} still belongs to the group,
checks the
+ * {@code TopologyEpoch} against the group's current epoch, and persists the
description.
+ * See KIP-1331.
+ *
+ * <p>Legal error codes are documented on {@link
StreamsGroupTopologyDescriptionUpdateResponse}.
+ */
+public class StreamsGroupTopologyDescriptionUpdateRequest extends
AbstractRequest {
+
+ public static class Builder extends
AbstractRequest.Builder<StreamsGroupTopologyDescriptionUpdateRequest> {
+ private final StreamsGroupTopologyDescriptionUpdateRequestData data;
+
+ public Builder(StreamsGroupTopologyDescriptionUpdateRequestData data) {
+ // The schema is marked latestVersionUnstable until the broker
handler lands; opt in
+ // here so the Builder can still construct the only existing
version.
+ super(ApiKeys.STREAMS_GROUP_TOPOLOGY_DESCRIPTION_UPDATE, true);
+ this.data = data;
+ }
+
+ @Override
+ public StreamsGroupTopologyDescriptionUpdateRequest build(short
version) {
+ return new StreamsGroupTopologyDescriptionUpdateRequest(data,
version);
+ }
+
+ @Override
+ public String toString() {
+ return data.toString();
+ }
+ }
+
+ private final StreamsGroupTopologyDescriptionUpdateRequestData data;
+
+ public
StreamsGroupTopologyDescriptionUpdateRequest(StreamsGroupTopologyDescriptionUpdateRequestData
data, short version) {
+ super(ApiKeys.STREAMS_GROUP_TOPOLOGY_DESCRIPTION_UPDATE, version);
+ this.data = data;
+ }
+
+ @Override
+ public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
+ ApiError apiError = ApiError.fromThrowable(e);
+ return new StreamsGroupTopologyDescriptionUpdateResponse(
+ new StreamsGroupTopologyDescriptionUpdateResponseData()
+ .setThrottleTimeMs(throttleTimeMs)
+ .setErrorCode(apiError.error().code())
+ .setErrorMessage(apiError.message())
+ );
+ }
+
+ @Override
+ public StreamsGroupTopologyDescriptionUpdateRequestData data() {
+ return data;
+ }
+
+ public static StreamsGroupTopologyDescriptionUpdateRequest parse(Readable
readable, short version) {
+ return new StreamsGroupTopologyDescriptionUpdateRequest(
+ new StreamsGroupTopologyDescriptionUpdateRequestData(readable,
version), version);
+ }
+}
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupDescribeResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupTopologyDescriptionUpdateResponse.java
similarity index 63%
copy from
clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupDescribeResponse.java
copy to
clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupTopologyDescriptionUpdateResponse.java
index efee6e521f4..fb93bf49ccd 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupDescribeResponse.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupTopologyDescriptionUpdateResponse.java
@@ -16,12 +16,11 @@
*/
package org.apache.kafka.common.requests;
-import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
+import
org.apache.kafka.common.message.StreamsGroupTopologyDescriptionUpdateResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.Readable;
-import java.util.EnumMap;
import java.util.Map;
/**
@@ -32,31 +31,28 @@ import java.util.Map;
* - {@link Errors#COORDINATOR_NOT_AVAILABLE}
* - {@link Errors#COORDINATOR_LOAD_IN_PROGRESS}
* - {@link Errors#INVALID_REQUEST}
- * - {@link Errors#INVALID_GROUP_ID}
+ * - {@link Errors#UNSUPPORTED_VERSION}
+ * - {@link Errors#UNKNOWN_MEMBER_ID}
* - {@link Errors#GROUP_ID_NOT_FOUND}
- * - {@link Errors#TOPIC_AUTHORIZATION_FAILED}
+ * - {@link Errors#STREAMS_TOPOLOGY_DESCRIPTION_UPDATE_FAILED}
*/
-public class StreamsGroupDescribeResponse extends AbstractResponse {
+public class StreamsGroupTopologyDescriptionUpdateResponse extends
AbstractResponse {
- private final StreamsGroupDescribeResponseData data;
+ private final StreamsGroupTopologyDescriptionUpdateResponseData data;
- public StreamsGroupDescribeResponse(StreamsGroupDescribeResponseData data)
{
- super(ApiKeys.STREAMS_GROUP_DESCRIBE);
+ public
StreamsGroupTopologyDescriptionUpdateResponse(StreamsGroupTopologyDescriptionUpdateResponseData
data) {
+ super(ApiKeys.STREAMS_GROUP_TOPOLOGY_DESCRIPTION_UPDATE);
this.data = data;
}
@Override
- public StreamsGroupDescribeResponseData data() {
+ public StreamsGroupTopologyDescriptionUpdateResponseData data() {
return data;
}
@Override
public Map<Errors, Integer> errorCounts() {
- Map<Errors, Integer> counts = new EnumMap<>(Errors.class);
- data.groups().forEach(
- group -> updateErrorCounts(counts,
Errors.forCode(group.errorCode()))
- );
- return counts;
+ return errorCounts(Errors.forCode(data.errorCode()));
}
@Override
@@ -69,9 +65,8 @@ public class StreamsGroupDescribeResponse extends
AbstractResponse {
data.setThrottleTimeMs(throttleTimeMs);
}
- public static StreamsGroupDescribeResponse parse(Readable readable, short
version) {
- return new StreamsGroupDescribeResponse(
- new StreamsGroupDescribeResponseData(readable, version)
- );
+ public static StreamsGroupTopologyDescriptionUpdateResponse parse(Readable
readable, short version) {
+ return new StreamsGroupTopologyDescriptionUpdateResponse(
+ new StreamsGroupTopologyDescriptionUpdateResponseData(readable,
version));
}
}
diff --git a/clients/src/main/resources/common/message/DeleteGroupsRequest.json
b/clients/src/main/resources/common/message/DeleteGroupsRequest.json
index 7d7c4371789..5c55a9939b7 100644
--- a/clients/src/main/resources/common/message/DeleteGroupsRequest.json
+++ b/clients/src/main/resources/common/message/DeleteGroupsRequest.json
@@ -21,7 +21,11 @@
// Version 1 is the same as version 0.
//
// Version 2 is the first flexible version.
- "validVersions": "0-2",
+ //
+ // Version 3 corresponds to the addition of an ErrorMessage field on each
per-group
+ // result in the response (populated when the broker needs to surface a
cause to the
+ // caller). The request body shape is unchanged at version 3.
+ "validVersions": "0-3",
"flexibleVersions": "2+",
"fields": [
{ "name": "GroupsNames", "type": "[]string", "versions": "0+",
"entityType": "groupId",
diff --git
a/clients/src/main/resources/common/message/DeleteGroupsResponse.json
b/clients/src/main/resources/common/message/DeleteGroupsResponse.json
index 168cde03ba3..4f5ad712a63 100644
--- a/clients/src/main/resources/common/message/DeleteGroupsResponse.json
+++ b/clients/src/main/resources/common/message/DeleteGroupsResponse.json
@@ -20,7 +20,10 @@
// Starting in version 1, on quota violation, brokers send out responses
before throttling.
//
// Version 2 is the first flexible version.
- "validVersions": "0-2",
+ //
+ // Version 3 adds the per-group ErrorMessage field so the broker can surface
a cause
+ // string when GROUP_DELETION_FAILED (or any other non-NONE error) is
returned for a group.
+ "validVersions": "0-3",
"flexibleVersions": "2+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
@@ -30,7 +33,9 @@
{ "name": "GroupId", "type": "string", "versions": "0+", "mapKey": true,
"entityType": "groupId",
"about": "The group id." },
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
- "about": "The deletion error, or 0 if the deletion succeeded." }
+ "about": "The deletion error, or 0 if the deletion succeeded." },
+ { "name": "ErrorMessage", "type": "string", "versions": "3+",
"nullableVersions": "3+", "ignorable": true, "default": "null",
+ "about": "The error message, or null if there was no error." }
]}
]
}
diff --git
a/clients/src/main/resources/common/message/StreamsGroupDescribeRequest.json
b/clients/src/main/resources/common/message/StreamsGroupDescribeRequest.json
index 6e36479043a..7d3741f1d33 100644
--- a/clients/src/main/resources/common/message/StreamsGroupDescribeRequest.json
+++ b/clients/src/main/resources/common/message/StreamsGroupDescribeRequest.json
@@ -18,12 +18,17 @@
"type": "request",
"listeners": ["broker"],
"name": "StreamsGroupDescribeRequest",
- "validVersions": "0",
+ // Version 1 adds IncludeTopologyDescription (KIP-1331). Marked unstable
until the broker
+ // topology-description manager lands; flip to false before release.
+ "latestVersionUnstable": true,
+ "validVersions": "0-1",
"flexibleVersions": "0+",
"fields": [
{ "name": "GroupIds", "type": "[]string", "versions": "0+", "entityType":
"groupId",
"about": "The ids of the groups to describe" },
{ "name": "IncludeAuthorizedOperations", "type": "bool", "versions": "0+",
- "about": "Whether to include authorized operations." }
+ "about": "Whether to include authorized operations." },
+ { "name": "IncludeTopologyDescription", "type": "bool", "versions": "1+",
"default": "false",
+ "about": "Whether to include the full topology description from the
topology description plugin in the response." }
]
-}
\ No newline at end of file
+}
diff --git
a/clients/src/main/resources/common/message/StreamsGroupDescribeResponse.json
b/clients/src/main/resources/common/message/StreamsGroupDescribeResponse.json
index b99f9c00b08..8b6eb165e18 100644
---
a/clients/src/main/resources/common/message/StreamsGroupDescribeResponse.json
+++
b/clients/src/main/resources/common/message/StreamsGroupDescribeResponse.json
@@ -17,7 +17,8 @@
"apiKey": 89,
"type": "response",
"name": "StreamsGroupDescribeResponse",
- "validVersions": "0",
+ // Version 1 adds TopologyDescription and TopologyDescriptionStatus
(KIP-1331).
+ "validVersions": "0-1",
"flexibleVersions": "0+",
// Supported errors:
// - GROUP_AUTHORIZATION_FAILED (version 0+)
@@ -106,7 +107,12 @@
"about": "True for classic members that have not been upgraded
yet." }
]},
{ "name": "AuthorizedOperations", "type": "int32", "versions": "0+",
"default": "-2147483648",
- "about": "32-bit bitfield to represent authorized operations for
this group." }
+ "about": "32-bit bitfield to represent authorized operations for
this group." },
+ { "name": "TopologyDescription", "type": "TopologyDescription",
"versions": "1+",
+ "nullableVersions": "1+", "default": "null",
+ "about": "The full topology description for this group. Non-null if
and only if TopologyDescriptionStatus is AVAILABLE (3); null otherwise." },
+ { "name": "TopologyDescriptionStatus", "type": "int8", "versions":
"1+", "default": "0",
+ "about": "The status of the topology description for this group,
paired with TopologyDescription: 0=NOT_REQUESTED (client did not set
IncludeTopologyDescription; TopologyDescription is null); 1=NOT_STORED (no
description recorded for this group; TopologyDescription is null); 2=ERROR
(broker failed to fetch the description, see broker logs; TopologyDescription
is null); 3=AVAILABLE (TopologyDescription is non-null and carries the
description)." }
]
}
],
@@ -155,6 +161,39 @@
{ "name": "TopicConfigs", "type": "[]KeyValue", "versions": "0+",
"about": "Topic-level configurations as key-value pairs."
}
+ ]},
+ { "name": "TopologyDescription", "versions": "1+", "fields": [
+ { "name": "Subtopologies", "type": "[]TopologyDescriptionSubtopology",
"versions": "1+",
+ "about": "The subtopologies that make up this topology." },
+ { "name": "GlobalStores", "type": "[]TopologyDescriptionGlobalStore",
"versions": "1+",
+ "about": "Global state stores used by this topology." }
+ ]},
+ { "name": "TopologyDescriptionSubtopology", "versions": "1+", "fields": [
+ { "name": "SubtopologyId", "type": "string", "versions": "1+",
+ "about": "The subtopology identifier, unique within the topology." },
+ { "name": "Nodes", "type": "[]TopologyDescriptionNode", "versions": "1+",
+ "about": "The processing nodes in this subtopology." }
+ ]},
+ { "name": "TopologyDescriptionNode", "versions": "1+", "fields": [
+ { "name": "Name", "type": "string", "versions": "1+",
+ "about": "The name of this node (e.g., KSTREAM-SOURCE-0000000000)." },
+ { "name": "NodeType", "type": "int8", "versions": "1+",
+ "about": "The type of this node: 1=SOURCE, 2=PROCESSOR, 3=SINK." },
+ { "name": "SourceTopics", "type": "[]string", "versions": "1+",
"entityType": "topicName",
+ "about": "The source topics this node reads from. Defined only for
source nodes, may be empty if source topics are dynamically determined." },
+ { "name": "SinkTopic", "type": "string", "versions": "1+", "entityType":
"topicName",
+ "nullableVersions": "1+", "default": "null",
+ "about": "The topic this node writes to. Defined only for sink nodes,
may be null if sink topic is dynamically determined." },
+ { "name": "Stores", "type": "[]string", "versions": "1+",
+ "about": "The state store names accessed by this node. Defined only
for processor nodes." },
+ { "name": "Successors", "type": "[]string", "versions": "1+",
+ "about": "The names of successor nodes in the processing graph.
Predecessor relationships are reconstructed from this field." }
+ ]},
+ { "name": "TopologyDescriptionGlobalStore", "versions": "1+", "fields": [
+ { "name": "Source", "type": "TopologyDescriptionNode", "versions": "1+",
+ "about": "The source node providing data to the global store." },
+ { "name": "Processor", "type": "TopologyDescriptionNode", "versions":
"1+",
+ "about": "The processor node that populates the global store." }
]}
]
-}
\ No newline at end of file
+}
diff --git
a/clients/src/main/resources/common/message/StreamsGroupHeartbeatRequest.json
b/clients/src/main/resources/common/message/StreamsGroupHeartbeatRequest.json
index a2cba46e763..dd03c28d07c 100644
---
a/clients/src/main/resources/common/message/StreamsGroupHeartbeatRequest.json
+++
b/clients/src/main/resources/common/message/StreamsGroupHeartbeatRequest.json
@@ -18,7 +18,12 @@
"type": "request",
"listeners": ["broker"],
"name": "StreamsGroupHeartbeatRequest",
- "validVersions": "0",
+ // Version 1 is the same as version 0; bumped together with
StreamsGroupHeartbeatResponse v1,
+ // which adds TopologyDescriptionRequired (KIP-1331). Required so the
response v1 is negotiated.
+ // Marked unstable until the broker topology-description manager lands
(KIP-1331); flip to false
+ // before release.
+ "latestVersionUnstable": true,
+ "validVersions": "0-1",
"flexibleVersions": "0+",
"fields": [
{ "name": "GroupId", "type": "string", "versions": "0+", "entityType":
"groupId",
diff --git
a/clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json
b/clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json
index 538816f4129..6422712ba52 100644
---
a/clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json
+++
b/clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json
@@ -17,7 +17,8 @@
"apiKey": 88,
"type": "response",
"name": "StreamsGroupHeartbeatResponse",
- "validVersions": "0",
+ // Version 1 adds TopologyDescriptionRequired (KIP-1331).
+ "validVersions": "0-1",
"flexibleVersions": "0+",
// Supported errors:
// - GROUP_AUTHORIZATION_FAILED (version 0+)
@@ -65,6 +66,9 @@
{ "name": "WarmupTasks", "type": "[]TaskIds", "versions": "0+",
"nullableVersions": "0+", "default": "null",
"about": "Assigned warm-up tasks for this client. Null if unchanged
since last heartbeat." },
+ { "name": "TopologyDescriptionRequired", "type": "bool", "versions": "1+",
"default": "false",
+ "about": "True if the broker does not have an up-to-date topology
description for this group. The client should send the topology description via
StreamsGroupTopologyDescriptionUpdate." },
+
// IQ-related information
{ "name": "EndpointInformationEpoch", "type": "int32", "versions": "0+",
"about": "The endpoint epoch set in the response"},
diff --git
a/clients/src/main/resources/common/message/StreamsGroupTopologyDescriptionUpdateRequest.json
b/clients/src/main/resources/common/message/StreamsGroupTopologyDescriptionUpdateRequest.json
new file mode 100644
index 00000000000..019ee189721
--- /dev/null
+++
b/clients/src/main/resources/common/message/StreamsGroupTopologyDescriptionUpdateRequest.json
@@ -0,0 +1,71 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+ "apiKey": 93,
+ "type": "request",
+ "listeners": ["broker"],
+ "name": "StreamsGroupTopologyDescriptionUpdateRequest",
+ // The broker handler is a stub returning UNSUPPORTED_VERSION; the real
handler lands in a
+ // later sub-task of KIP-1331. Flip to false before release.
+ "latestVersionUnstable": true,
+ "validVersions": "0",
+ "flexibleVersions": "0+",
+ "fields": [
+ { "name": "GroupId", "type": "string", "versions": "0+", "entityType":
"groupId",
+ "about": "The streams group identifier." },
+ { "name": "MemberId", "type": "string", "versions": "0+",
+ "about": "The ID of the streams group member sending the push. The
broker validates that this member is still in the group; mismatches (including
when the group itself has been deleted) are rejected with UNKNOWN_MEMBER_ID so
the client treats itself as fenced and rejoins." },
+ { "name": "TopologyEpoch", "type": "int32", "versions": "0+",
+ "about": "The epoch of the topology being described." },
+ { "name": "TopologyDescription", "type": "TopologyDescription",
"versions": "0+",
+ "about": "The topology description." }
+ ],
+ "commonStructs": [
+ { "name": "TopologyDescription", "versions": "0+", "fields": [
+ { "name": "Subtopologies", "type": "[]TopologyDescriptionSubtopology",
"versions": "0+",
+ "about": "The subtopologies that make up this topology." },
+ { "name": "GlobalStores", "type": "[]TopologyDescriptionGlobalStore",
"versions": "0+",
+ "about": "Global state stores used by this topology." }
+ ]},
+ { "name": "TopologyDescriptionSubtopology", "versions": "0+", "fields": [
+ { "name": "SubtopologyId", "type": "string", "versions": "0+",
+ "about": "The subtopology identifier, unique within the topology." },
+ { "name": "Nodes", "type": "[]TopologyDescriptionNode", "versions": "0+",
+ "about": "The processing nodes in this subtopology." }
+ ]},
+ { "name": "TopologyDescriptionNode", "versions": "0+", "fields": [
+ { "name": "Name", "type": "string", "versions": "0+",
+ "about": "The name of this node (e.g., KSTREAM-SOURCE-0000000000)." },
+ { "name": "NodeType", "type": "int8", "versions": "0+",
+ "about": "The type of this node: 1=SOURCE, 2=PROCESSOR, 3=SINK." },
+ { "name": "SourceTopics", "type": "[]string", "versions": "0+",
"entityType": "topicName",
+ "about": "The source topics this node reads from. Defined only for
source nodes, may be empty if source topics are dynamically determined." },
+ { "name": "SinkTopic", "type": "string", "versions": "0+", "entityType":
"topicName",
+ "nullableVersions": "0+", "default": "null",
+ "about": "The topic this node writes to. Defined only for sink nodes,
may be null if sink topic is dynamically determined." },
+ { "name": "Stores", "type": "[]string", "versions": "0+",
+ "about": "The state store names accessed by this node. Defined only
for processor nodes." },
+ { "name": "Successors", "type": "[]string", "versions": "0+",
+ "about": "The names of successor nodes in the processing graph.
Predecessor relationships are reconstructed from this field." }
+ ]},
+ { "name": "TopologyDescriptionGlobalStore", "versions": "0+", "fields": [
+ { "name": "Source", "type": "TopologyDescriptionNode", "versions": "0+",
+ "about": "The source node providing data to the global store." },
+ { "name": "Processor", "type": "TopologyDescriptionNode", "versions":
"0+",
+ "about": "The processor node that populates the global store." }
+ ]}
+ ]
+}
diff --git
a/clients/src/main/resources/common/message/DeleteGroupsResponse.json
b/clients/src/main/resources/common/message/StreamsGroupTopologyDescriptionUpdateResponse.json
similarity index 54%
copy from clients/src/main/resources/common/message/DeleteGroupsResponse.json
copy to
clients/src/main/resources/common/message/StreamsGroupTopologyDescriptionUpdateResponse.json
index 168cde03ba3..e03b4215cd3 100644
--- a/clients/src/main/resources/common/message/DeleteGroupsResponse.json
+++
b/clients/src/main/resources/common/message/StreamsGroupTopologyDescriptionUpdateResponse.json
@@ -14,23 +14,28 @@
// limitations under the License.
{
- "apiKey": 42,
+ "apiKey": 93,
"type": "response",
- "name": "DeleteGroupsResponse",
- // Starting in version 1, on quota violation, brokers send out responses
before throttling.
- //
- // Version 2 is the first flexible version.
- "validVersions": "0-2",
- "flexibleVersions": "2+",
+ "name": "StreamsGroupTopologyDescriptionUpdateResponse",
+ "validVersions": "0",
+ "flexibleVersions": "0+",
+ // Supported errors:
+ // - GROUP_AUTHORIZATION_FAILED (version 0+)
+ // - NOT_COORDINATOR (version 0+)
+ // - COORDINATOR_NOT_AVAILABLE (version 0+)
+ // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
+ // - INVALID_REQUEST (version 0+)
+ // - UNSUPPORTED_VERSION (version 0+)
+ // - UNKNOWN_MEMBER_ID (version 0+)
+ // - GROUP_ID_NOT_FOUND (version 0+)
+ // - STREAMS_TOPOLOGY_DESCRIPTION_UPDATE_FAILED (version 0+)
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
"about": "The duration in milliseconds for which the request was
throttled due to a quota violation, or zero if the request did not violate any
quota." },
- { "name": "Results", "type": "[]DeletableGroupResult", "versions": "0+",
- "about": "The deletion results.", "fields": [
- { "name": "GroupId", "type": "string", "versions": "0+", "mapKey": true,
"entityType": "groupId",
- "about": "The group id." },
- { "name": "ErrorCode", "type": "int16", "versions": "0+",
- "about": "The deletion error, or 0 if the deletion succeeded." }
- ]}
+ { "name": "ErrorCode", "type": "int16", "versions": "0+",
+ "about": "The top-level error code, or 0 if there was no error." },
+ { "name": "ErrorMessage", "type": "string", "versions": "0+",
+ "nullableVersions": "0+", "default": "null",
+ "about": "The top-level error message, or null if there was no error." }
]
}
diff --git
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 29bc1213c63..26a13f561bb 100644
---
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -234,6 +234,8 @@ import
org.apache.kafka.common.message.StreamsGroupDescribeRequestData;
import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData;
import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
+import
org.apache.kafka.common.message.StreamsGroupTopologyDescriptionUpdateRequestData;
+import
org.apache.kafka.common.message.StreamsGroupTopologyDescriptionUpdateResponseData;
import org.apache.kafka.common.message.SyncGroupRequestData;
import
org.apache.kafka.common.message.SyncGroupRequestData.SyncGroupRequestAssignment;
import org.apache.kafka.common.message.SyncGroupResponseData;
@@ -703,6 +705,17 @@ public class RequestResponseTest {
assertTrue(exception.getMessage().contains("[foo, bar]"));
}
+ @Test
+ public void testDeleteGroupsResponseV3PreservesErrorMessage() {
+ DeleteGroupsResponse response = createDeleteGroupsResponse();
+ short version = ApiKeys.DELETE_GROUPS.latestVersion();
+ DeleteGroupsResponse parsed =
DeleteGroupsResponse.parse(response.serialize(version), version);
+ DeletableGroupResult failed =
parsed.data().results().find("failed-group");
+ assertNotNull(failed);
+ assertEquals(Errors.GROUP_DELETION_FAILED.code(), failed.errorCode());
+ assertEquals("plugin offline", failed.errorMessage());
+ }
+
@Test
public void testFetchRequestIsolationLevel() {
FetchRequest request = createFetchRequest((short) 4,
IsolationLevel.READ_COMMITTED);
@@ -1076,6 +1089,7 @@ public class RequestResponseTest {
case DESCRIBE_SHARE_GROUP_OFFSETS: return
createDescribeShareGroupOffsetsRequest(version);
case ALTER_SHARE_GROUP_OFFSETS: return
createAlterShareGroupOffsetsRequest(version);
case DELETE_SHARE_GROUP_OFFSETS: return
createDeleteShareGroupOffsetsRequest(version);
+ case STREAMS_GROUP_TOPOLOGY_DESCRIPTION_UPDATE: return
createStreamsGroupTopologyDescriptionUpdateRequest(version);
default: throw new IllegalArgumentException("Unknown API key " +
apikey);
}
}
@@ -1166,11 +1180,12 @@ public class RequestResponseTest {
case WRITE_SHARE_GROUP_STATE: return
createWriteShareGroupStateResponse();
case DELETE_SHARE_GROUP_STATE: return
createDeleteShareGroupStateResponse();
case READ_SHARE_GROUP_STATE_SUMMARY: return
createReadShareGroupStateSummaryResponse();
- case STREAMS_GROUP_HEARTBEAT: return
createStreamsGroupHeartbeatResponse();
- case STREAMS_GROUP_DESCRIBE: return
createStreamsGroupDescribeResponse();
+ case STREAMS_GROUP_HEARTBEAT: return
createStreamsGroupHeartbeatResponse(version);
+ case STREAMS_GROUP_DESCRIBE: return
createStreamsGroupDescribeResponse(version);
case DESCRIBE_SHARE_GROUP_OFFSETS: return
createDescribeShareGroupOffsetsResponse();
case ALTER_SHARE_GROUP_OFFSETS: return
createAlterShareGroupOffsetsResponse();
case DELETE_SHARE_GROUP_OFFSETS: return
createDeleteShareGroupOffsetsResponse();
+ case STREAMS_GROUP_TOPOLOGY_DESCRIPTION_UPDATE: return
createStreamsGroupTopologyDescriptionUpdateResponse();
default: throw new IllegalArgumentException("Unknown API key " +
apikey);
}
}
@@ -2286,6 +2301,10 @@ public class RequestResponseTest {
result.add(new DeletableGroupResult()
.setGroupId("test-group")
.setErrorCode(Errors.NONE.code()));
+ result.add(new DeletableGroupResult()
+ .setGroupId("failed-group")
+ .setErrorCode(Errors.GROUP_DELETION_FAILED.code())
+ .setErrorMessage("plugin offline"));
return new DeleteGroupsResponse(
new DeleteGroupsResponseData()
.setResults(result)
@@ -3857,20 +3876,42 @@ public class RequestResponseTest {
}
private AbstractRequest createStreamsGroupDescribeRequest(final short
version) {
- return new StreamsGroupDescribeRequest.Builder(new
StreamsGroupDescribeRequestData()
+ StreamsGroupDescribeRequestData data = new
StreamsGroupDescribeRequestData()
.setGroupIds(Collections.singletonList("group"))
- .setIncludeAuthorizedOperations(false)).build(version);
+ .setIncludeAuthorizedOperations(false);
+ if (version >= 1) {
+ data.setIncludeTopologyDescription(true);
+ }
+ return new StreamsGroupDescribeRequest.Builder(data).build(version);
}
private AbstractRequest createStreamsGroupHeartbeatRequest(final short
version) {
return new StreamsGroupHeartbeatRequest.Builder(new
StreamsGroupHeartbeatRequestData()).build(version);
}
- private AbstractResponse createStreamsGroupDescribeResponse() {
- StreamsGroupDescribeResponseData data = new
StreamsGroupDescribeResponseData()
- .setGroups(Collections.singletonList(
+ private AbstractResponse createStreamsGroupDescribeResponse(final short
version) {
+ StreamsGroupDescribeResponseData.DescribedGroup group =
+ new StreamsGroupDescribeResponseData.DescribedGroup()
+ .setGroupId("group")
+ .setErrorCode((short) 0)
+ .setErrorMessage(Errors.forCode((short) 0).message())
+ .setGroupState("EMPTY")
+ .setGroupEpoch(0)
+ .setAssignmentEpoch(0)
+ .setMembers(new ArrayList<>(0))
+ .setTopology(null);
+ if (version >= 1) {
+ group.setTopologyDescription(new
StreamsGroupDescribeResponseData.TopologyDescription()
+ .setSubtopologies(new ArrayList<>(0))
+ .setGlobalStores(new ArrayList<>(0)));
+
group.setTopologyDescriptionStatus(StreamsGroupDescribeResponse.TOPOLOGY_DESCRIPTION_STATUS_AVAILABLE);
+ }
+ List<StreamsGroupDescribeResponseData.DescribedGroup> groups = new
ArrayList<>();
+ groups.add(group);
+ if (version >= 1) {
+ StreamsGroupDescribeResponseData.DescribedGroup notStoredGroup =
new StreamsGroupDescribeResponseData.DescribedGroup()
- .setGroupId("group")
+ .setGroupId("group-without-description")
.setErrorCode((short) 0)
.setErrorMessage(Errors.forCode((short) 0).message())
.setGroupState("EMPTY")
@@ -3878,13 +3919,82 @@ public class RequestResponseTest {
.setAssignmentEpoch(0)
.setMembers(new ArrayList<>(0))
.setTopology(null)
- ))
+ .setTopologyDescription(null)
+
.setTopologyDescriptionStatus(StreamsGroupDescribeResponse.TOPOLOGY_DESCRIPTION_STATUS_NOT_STORED);
+ groups.add(notStoredGroup);
+ }
+ StreamsGroupDescribeResponseData data = new
StreamsGroupDescribeResponseData()
+ .setGroups(groups)
.setThrottleTimeMs(1000);
return new StreamsGroupDescribeResponse(data);
}
- private AbstractResponse createStreamsGroupHeartbeatResponse() {
- return new StreamsGroupHeartbeatResponse(new
StreamsGroupHeartbeatResponseData());
+ private AbstractResponse createStreamsGroupHeartbeatResponse(final short
version) {
+ StreamsGroupHeartbeatResponseData data = new
StreamsGroupHeartbeatResponseData();
+ if (version >= 1) {
+ data.setTopologyDescriptionRequired(true);
+ }
+ return new StreamsGroupHeartbeatResponse(data);
+ }
+
+ private AbstractRequest
createStreamsGroupTopologyDescriptionUpdateRequest(final short version) {
+
StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescriptionNode
sourceNode =
+ new
StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescriptionNode()
+ .setName("KSTREAM-SOURCE-0000000000")
+ .setNodeType((byte) 1)
+ .setSourceTopics(List.of("input-topic"))
+ .setSuccessors(List.of("KSTREAM-PROCESSOR-0000000001"));
+
StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescriptionNode
processorNode =
+ new
StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescriptionNode()
+ .setName("KSTREAM-PROCESSOR-0000000001")
+ .setNodeType((byte) 2)
+ .setStores(List.of("store-1"))
+ .setSuccessors(List.of("KSTREAM-SINK-0000000002"));
+
StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescriptionNode
sinkNode =
+ new
StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescriptionNode()
+ .setName("KSTREAM-SINK-0000000002")
+ .setNodeType((byte) 3)
+ .setSinkTopic("output-topic");
+
StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescriptionNode
dynamicSinkNode =
+ new
StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescriptionNode()
+ .setName("KSTREAM-SINK-0000000005")
+ .setNodeType((byte) 3)
+ .setSinkTopic(null);
+
StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescriptionSubtopology
subtopology =
+ new
StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescriptionSubtopology()
+ .setSubtopologyId("0")
+ .setNodes(List.of(sourceNode, processorNode, sinkNode,
dynamicSinkNode));
+
StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescriptionNode
globalSource =
+ new
StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescriptionNode()
+ .setName("KSTREAM-GLOBAL-SOURCE-0000000003")
+ .setNodeType((byte) 1)
+ .setSourceTopics(List.of("global-topic"));
+
StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescriptionNode
globalProcessor =
+ new
StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescriptionNode()
+ .setName("KTABLE-SOURCE-0000000004")
+ .setNodeType((byte) 2)
+ .setStores(List.of("global-store"));
+
StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescriptionGlobalStore
globalStore =
+ new
StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescriptionGlobalStore()
+ .setSource(globalSource)
+ .setProcessor(globalProcessor);
+ StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescription
topology =
+ new
StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescription()
+ .setSubtopologies(List.of(subtopology))
+ .setGlobalStores(List.of(globalStore));
+ return new StreamsGroupTopologyDescriptionUpdateRequest.Builder(
+ new StreamsGroupTopologyDescriptionUpdateRequestData()
+ .setGroupId("test-group")
+ .setMemberId("test-member")
+ .setTopologyEpoch(1)
+ .setTopologyDescription(topology)
+ ).build(version);
+ }
+
+ private AbstractResponse
createStreamsGroupTopologyDescriptionUpdateResponse() {
+ return new StreamsGroupTopologyDescriptionUpdateResponse(
+ new StreamsGroupTopologyDescriptionUpdateResponseData()
+ );
}
@Test
@@ -4018,4 +4128,24 @@ public class RequestResponseTest {
assertThrows(UnsupportedVersionException.class, () -> new
ListConfigResourcesRequest.Builder(data).build((short) 0));
});
}
+
+ @Test
+ public void
testStreamsGroupDescribeRequestV0RejectsIncludeTopologyDescription() {
+ StreamsGroupDescribeRequestData data = new
StreamsGroupDescribeRequestData()
+ .setGroupIds(List.of("g1"))
+ .setIncludeTopologyDescription(true);
+ StreamsGroupDescribeRequest request = new
StreamsGroupDescribeRequest.Builder(data).build((short) 0);
+ assertThrows(UnsupportedVersionException.class, () ->
request.serialize());
+ }
+
+ @Test
+ public void
testStreamsGroupDescribeResponseV0RejectsTopologyDescriptionFields() {
+ StreamsGroupDescribeResponseData data = new
StreamsGroupDescribeResponseData()
+ .setGroups(List.of(new
StreamsGroupDescribeResponseData.DescribedGroup()
+ .setGroupId("g1")
+ .setTopologyDescription(new
StreamsGroupDescribeResponseData.TopologyDescription())
+
.setTopologyDescriptionStatus(StreamsGroupDescribeResponse.TOPOLOGY_DESCRIPTION_STATUS_AVAILABLE)));
+ StreamsGroupDescribeResponse response = new
StreamsGroupDescribeResponse(data);
+ assertThrows(UnsupportedVersionException.class, () ->
response.serialize((short) 0));
+ }
}
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 03bac25b016..a530454cb91 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -248,6 +248,7 @@ class KafkaApis(val requestChannel: RequestChannel,
case ApiKeys.DELETE_SHARE_GROUP_OFFSETS =>
handleDeleteShareGroupOffsetsRequest(request).exceptionally(handleError)
case ApiKeys.STREAMS_GROUP_DESCRIBE =>
handleStreamsGroupDescribe(request).exceptionally(handleError)
case ApiKeys.STREAMS_GROUP_HEARTBEAT =>
handleStreamsGroupHeartbeat(request).exceptionally(handleError)
+ case ApiKeys.STREAMS_GROUP_TOPOLOGY_DESCRIPTION_UPDATE =>
handleStreamsGroupTopologyDescriptionUpdate(request).exceptionally(handleError)
case _ => throw new IllegalStateException(s"No handler for request api
key ${request.header.apiKey}")
}
} catch {
@@ -2897,6 +2898,15 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}
+ // Stub handler for KIP-1331. The full handler lands in a later sub-task;
until then this
+ // responds with UNSUPPORTED_VERSION so callers fail loud rather than hit
the IllegalStateException
+ // default branch in handle().
+ def handleStreamsGroupTopologyDescriptionUpdate(request: Request):
CompletableFuture[Unit] = {
+ val updateRequest =
request.body(classOf[StreamsGroupTopologyDescriptionUpdateRequest])
+ requestHelper.sendMaybeThrottle(request,
updateRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
+ CompletableFuture.completedFuture[Unit](())
+ }
+
def handleStreamsGroupDescribe(request: Request): CompletableFuture[Unit] = {
val streamsGroupDescribeRequest =
request.body(classOf[StreamsGroupDescribeRequest])
val includeAuthorizedOperations =
streamsGroupDescribeRequest.data.includeAuthorizedOperations
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index 639328798a0..02c523f0d3f 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -758,6 +758,9 @@ class RequestQuotaTest extends BaseRequestTest {
case ApiKeys.STREAMS_GROUP_DESCRIBE =>
new StreamsGroupDescribeRequest.Builder(new
StreamsGroupDescribeRequestData())
+ case ApiKeys.STREAMS_GROUP_TOPOLOGY_DESCRIPTION_UPDATE =>
+ new StreamsGroupTopologyDescriptionUpdateRequest.Builder(new
StreamsGroupTopologyDescriptionUpdateRequestData())
+
case ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS =>
new DescribeShareGroupOffsetsRequest.Builder(new
DescribeShareGroupOffsetsRequestData())
diff --git
a/server/src/main/java/org/apache/kafka/network/RequestConvertToJson.java
b/server/src/main/java/org/apache/kafka/network/RequestConvertToJson.java
index 09602ea498d..023ef02a5f6 100644
--- a/server/src/main/java/org/apache/kafka/network/RequestConvertToJson.java
+++ b/server/src/main/java/org/apache/kafka/network/RequestConvertToJson.java
@@ -179,6 +179,8 @@ import
org.apache.kafka.common.message.StreamsGroupDescribeRequestDataJsonConver
import
org.apache.kafka.common.message.StreamsGroupDescribeResponseDataJsonConverter;
import
org.apache.kafka.common.message.StreamsGroupHeartbeatRequestDataJsonConverter;
import
org.apache.kafka.common.message.StreamsGroupHeartbeatResponseDataJsonConverter;
+import
org.apache.kafka.common.message.StreamsGroupTopologyDescriptionUpdateRequestDataJsonConverter;
+import
org.apache.kafka.common.message.StreamsGroupTopologyDescriptionUpdateResponseDataJsonConverter;
import org.apache.kafka.common.message.SyncGroupRequestDataJsonConverter;
import org.apache.kafka.common.message.SyncGroupResponseDataJsonConverter;
import org.apache.kafka.common.message.TxnOffsetCommitRequestDataJsonConverter;
@@ -362,6 +364,8 @@ import
org.apache.kafka.common.requests.StreamsGroupDescribeRequest;
import org.apache.kafka.common.requests.StreamsGroupDescribeResponse;
import org.apache.kafka.common.requests.StreamsGroupHeartbeatRequest;
import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse;
+import
org.apache.kafka.common.requests.StreamsGroupTopologyDescriptionUpdateRequest;
+import
org.apache.kafka.common.requests.StreamsGroupTopologyDescriptionUpdateResponse;
import org.apache.kafka.common.requests.SyncGroupRequest;
import org.apache.kafka.common.requests.SyncGroupResponse;
import org.apache.kafka.common.requests.TxnOffsetCommitRequest;
@@ -564,6 +568,8 @@ public class RequestConvertToJson {
UpdateFeaturesRequestDataJsonConverter.write(((UpdateFeaturesRequest)
request).data(), request.version());
case UPDATE_RAFT_VOTER ->
UpdateRaftVoterRequestDataJsonConverter.write(((UpdateRaftVoterRequest)
request).data(), request.version());
+ case STREAMS_GROUP_TOPOLOGY_DESCRIPTION_UPDATE ->
+
StreamsGroupTopologyDescriptionUpdateRequestDataJsonConverter.write(((StreamsGroupTopologyDescriptionUpdateRequest)
request).data(), request.version());
case VOTE -> VoteRequestDataJsonConverter.write(((VoteRequest)
request).data(), request.version());
case WRITE_SHARE_GROUP_STATE ->
WriteShareGroupStateRequestDataJsonConverter.write(((WriteShareGroupStateRequest)
request).data(), request.version());
@@ -741,6 +747,8 @@ public class RequestConvertToJson {
UpdateFeaturesResponseDataJsonConverter.write(((UpdateFeaturesResponse)
response).data(), version);
case UPDATE_RAFT_VOTER ->
UpdateRaftVoterResponseDataJsonConverter.write(((UpdateRaftVoterResponse)
response).data(), version);
+ case STREAMS_GROUP_TOPOLOGY_DESCRIPTION_UPDATE ->
+
StreamsGroupTopologyDescriptionUpdateResponseDataJsonConverter.write(((StreamsGroupTopologyDescriptionUpdateResponse)
response).data(), version);
case VOTE -> VoteResponseDataJsonConverter.write(((VoteResponse)
response).data(), version);
case WRITE_SHARE_GROUP_STATE ->
WriteShareGroupStateResponseDataJsonConverter.write(((WriteShareGroupStateResponse)
response).data(), version);