This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 06f699664bd KAFKA-20415: Add share.version=2 for gated DLQ support. 
(#21994)
06f699664bd is described below

commit 06f699664bd016b928bf8064710f68eb7dfb136a
Author: Sushant Mahajan <[email protected]>
AuthorDate: Thu Apr 9 01:06:42 2026 +0530

    KAFKA-20415: Add share.version=2 for gated DLQ support. (#21994)
    
    * DLQ support for share groups KIP-1191 will be gated by a
    `share.version` upgrade - slated for 4.4
    * In this PR, we have added the appropriate changes and updated tests in
    `FeatureCommandTest` and `ApiVersionsRequestTest`
    
    Reviewers: Andrew Schofield <[email protected]>
---
 .../unit/kafka/server/AbstractApiVersionsRequestTest.scala    |  2 +-
 .../java/org/apache/kafka/server/common/MetadataVersion.java  |  3 +++
 .../java/org/apache/kafka/server/common/ShareVersion.java     | 11 ++++++++++-
 .../test/java/org/apache/kafka/tools/FeatureCommandTest.java  |  8 ++++----
 4 files changed, 18 insertions(+), 6 deletions(-)

diff --git 
a/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala
index 958c8440c2c..28426e825ef 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala
@@ -78,7 +78,7 @@ abstract class AbstractApiVersionsRequestTest(cluster: 
ClusterInstance) {
       assertEquals(EligibleLeaderReplicasVersion.ELRV_1.featureLevel(), 
apiVersionsResponse.data().supportedFeatures().find(EligibleLeaderReplicasVersion.FEATURE_NAME).maxVersion())
 
       assertEquals(0, 
apiVersionsResponse.data().supportedFeatures().find(ShareVersion.FEATURE_NAME).minVersion())
-      assertEquals(ShareVersion.SV_1.featureLevel(), 
apiVersionsResponse.data().supportedFeatures().find(ShareVersion.FEATURE_NAME).maxVersion())
+      assertEquals(ShareVersion.SV_2.featureLevel(), 
apiVersionsResponse.data().supportedFeatures().find(ShareVersion.FEATURE_NAME).maxVersion())
 
       assertEquals(0, 
apiVersionsResponse.data().supportedFeatures().find(StreamsVersion.FEATURE_NAME).minVersion())
       assertEquals(StreamsVersion.SV_1.featureLevel(), 
apiVersionsResponse.data().supportedFeatures().find(StreamsVersion.FEATURE_NAME).maxVersion())
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
 
b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
index 406ccbeebcb..4177d1bc859 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
@@ -130,6 +130,9 @@ public enum MetadataVersion {
     // they have set the configuration unstable.feature.versions.enable=true.
     // Please move this comment when updating the LATEST_PRODUCTION constant.
     //
+
+    // IBP_4_4_IV0 enables dead-letter queue support for share groups 
(KIP-1191). When this version
+    // is finalized, so will the DLQ support.
     IBP_4_4_IV0(31, "4.4", "IV0", false);
 
 
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/common/ShareVersion.java 
b/server-common/src/main/java/org/apache/kafka/server/common/ShareVersion.java
index f1335069333..c109219b478 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/common/ShareVersion.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/common/ShareVersion.java
@@ -25,7 +25,10 @@ public enum ShareVersion implements FeatureVersion {
 
     // Version 1 enables share groups (KIP-932).
     // This was a preview in 4.1 which required enabling explicitly.
-    SV_1(1, MetadataVersion.IBP_4_2_IV0, Map.of());
+    SV_1(1, MetadataVersion.IBP_4_2_IV0, Map.of()),
+
+    // Version 2 adds supports for DLQ (KIP-1191)
+    SV_2(2, MetadataVersion.IBP_4_4_IV0, Map.of());
 
     public static final String FEATURE_NAME = "share.version";
 
@@ -69,12 +72,18 @@ public enum ShareVersion implements FeatureVersion {
         return featureLevel >= SV_1.featureLevel;
     }
 
+    public boolean supportsShareGroupDLQ() {
+        return featureLevel >= SV_2.featureLevel;
+    }
+
     public static ShareVersion fromFeatureLevel(short version) {
         switch (version) {
             case 0:
                 return SV_0;
             case 1:
                 return SV_1;
+            case 2:
+                return SV_2;
             default:
                 throw new RuntimeException("Unknown share feature level: " + 
(int) version);
         }
diff --git a/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java 
b/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java
index 565c78c9eef..f569905d8c3 100644
--- a/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java
@@ -71,7 +71,7 @@ public class FeatureCommandTest {
                 outputWithoutEpoch(features.get(3))
         );
         assertFeatureOutput(
-                "share.version", "0", "1", "0",
+                "share.version", "0", "2", "0",
                 outputWithoutEpoch(features.get(4))
         );
         assertFeatureOutput(
@@ -111,7 +111,7 @@ public class FeatureCommandTest {
                 outputWithoutEpoch(features.get(3))
         );
         assertFeatureOutput(
-                "share.version", "0", "1", "0",
+                "share.version", "0", "2", "0",
                 outputWithoutEpoch(features.get(4))
         );
         assertFeatureOutput(
@@ -168,7 +168,7 @@ public class FeatureCommandTest {
                 outputWithoutEpoch(featuresWithUnstable.get(3))
         );
         assertFeatureOutput(
-                "share.version", "0", "1", "0",
+                "share.version", "0", "2", "0",
                 outputWithoutEpoch(featuresWithUnstable.get(4))
         );
         assertFeatureOutput(
@@ -262,7 +262,7 @@ public class FeatureCommandTest {
                 outputWithoutEpoch(featuresWithUnstable.get(3))
         );
         assertFeatureOutput(
-                "share.version", "0", "1", "0",
+                "share.version", "0", "2", "0",
                 outputWithoutEpoch(featuresWithUnstable.get(4))
         );
         assertFeatureOutput(

Reply via email to