This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 06f699664bd KAFKA-20415: Add share.version=2 for gated DLQ support.
(#21994)
06f699664bd is described below
commit 06f699664bd016b928bf8064710f68eb7dfb136a
Author: Sushant Mahajan <[email protected]>
AuthorDate: Thu Apr 9 01:06:42 2026 +0530
KAFKA-20415: Add share.version=2 for gated DLQ support. (#21994)
* DLQ support for share groups KIP-1191 will be gated by a
`share.version` upgrade - slated for 4.4
* In this PR, we have added the appropriate changes and updated tests in
`FeatureCommandTest` and `ApiVersionsRequestTest`
Reviewers: Andrew Schofield <[email protected]>
---
.../unit/kafka/server/AbstractApiVersionsRequestTest.scala | 2 +-
.../java/org/apache/kafka/server/common/MetadataVersion.java | 3 +++
.../java/org/apache/kafka/server/common/ShareVersion.java | 11 ++++++++++-
.../test/java/org/apache/kafka/tools/FeatureCommandTest.java | 8 ++++----
4 files changed, 18 insertions(+), 6 deletions(-)
diff --git
a/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala
b/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala
index 958c8440c2c..28426e825ef 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala
@@ -78,7 +78,7 @@ abstract class AbstractApiVersionsRequestTest(cluster:
ClusterInstance) {
assertEquals(EligibleLeaderReplicasVersion.ELRV_1.featureLevel(),
apiVersionsResponse.data().supportedFeatures().find(EligibleLeaderReplicasVersion.FEATURE_NAME).maxVersion())
assertEquals(0,
apiVersionsResponse.data().supportedFeatures().find(ShareVersion.FEATURE_NAME).minVersion())
- assertEquals(ShareVersion.SV_1.featureLevel(),
apiVersionsResponse.data().supportedFeatures().find(ShareVersion.FEATURE_NAME).maxVersion())
+ assertEquals(ShareVersion.SV_2.featureLevel(),
apiVersionsResponse.data().supportedFeatures().find(ShareVersion.FEATURE_NAME).maxVersion())
assertEquals(0,
apiVersionsResponse.data().supportedFeatures().find(StreamsVersion.FEATURE_NAME).minVersion())
assertEquals(StreamsVersion.SV_1.featureLevel(),
apiVersionsResponse.data().supportedFeatures().find(StreamsVersion.FEATURE_NAME).maxVersion())
diff --git
a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
index 406ccbeebcb..4177d1bc859 100644
---
a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
+++
b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
@@ -130,6 +130,9 @@ public enum MetadataVersion {
// they have set the configuration unstable.feature.versions.enable=true.
// Please move this comment when updating the LATEST_PRODUCTION constant.
//
+
+ // IBP_4_4_IV0 enables dead-letter queue support for share groups
(KIP-1191). When this version
+ // is finalized, so will the DLQ support.
IBP_4_4_IV0(31, "4.4", "IV0", false);
diff --git
a/server-common/src/main/java/org/apache/kafka/server/common/ShareVersion.java
b/server-common/src/main/java/org/apache/kafka/server/common/ShareVersion.java
index f1335069333..c109219b478 100644
---
a/server-common/src/main/java/org/apache/kafka/server/common/ShareVersion.java
+++
b/server-common/src/main/java/org/apache/kafka/server/common/ShareVersion.java
@@ -25,7 +25,10 @@ public enum ShareVersion implements FeatureVersion {
// Version 1 enables share groups (KIP-932).
// This was a preview in 4.1 which required enabling explicitly.
- SV_1(1, MetadataVersion.IBP_4_2_IV0, Map.of());
+ SV_1(1, MetadataVersion.IBP_4_2_IV0, Map.of()),
+
+ // Version 2 adds supports for DLQ (KIP-1191)
+ SV_2(2, MetadataVersion.IBP_4_4_IV0, Map.of());
public static final String FEATURE_NAME = "share.version";
@@ -69,12 +72,18 @@ public enum ShareVersion implements FeatureVersion {
return featureLevel >= SV_1.featureLevel;
}
+ public boolean supportsShareGroupDLQ() {
+ return featureLevel >= SV_2.featureLevel;
+ }
+
public static ShareVersion fromFeatureLevel(short version) {
switch (version) {
case 0:
return SV_0;
case 1:
return SV_1;
+ case 2:
+ return SV_2;
default:
throw new RuntimeException("Unknown share feature level: " +
(int) version);
}
diff --git a/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java
b/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java
index 565c78c9eef..f569905d8c3 100644
--- a/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java
@@ -71,7 +71,7 @@ public class FeatureCommandTest {
outputWithoutEpoch(features.get(3))
);
assertFeatureOutput(
- "share.version", "0", "1", "0",
+ "share.version", "0", "2", "0",
outputWithoutEpoch(features.get(4))
);
assertFeatureOutput(
@@ -111,7 +111,7 @@ public class FeatureCommandTest {
outputWithoutEpoch(features.get(3))
);
assertFeatureOutput(
- "share.version", "0", "1", "0",
+ "share.version", "0", "2", "0",
outputWithoutEpoch(features.get(4))
);
assertFeatureOutput(
@@ -168,7 +168,7 @@ public class FeatureCommandTest {
outputWithoutEpoch(featuresWithUnstable.get(3))
);
assertFeatureOutput(
- "share.version", "0", "1", "0",
+ "share.version", "0", "2", "0",
outputWithoutEpoch(featuresWithUnstable.get(4))
);
assertFeatureOutput(
@@ -262,7 +262,7 @@ public class FeatureCommandTest {
outputWithoutEpoch(featuresWithUnstable.get(3))
);
assertFeatureOutput(
- "share.version", "0", "1", "0",
+ "share.version", "0", "2", "0",
outputWithoutEpoch(featuresWithUnstable.get(4))
);
assertFeatureOutput(