This is an automated email from the ASF dual-hosted git repository.

AndrewJSchofield pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b6d25037102 KAFKA-20410: Add DLQ configuration parameters for Share 
Groups (KIP-1191) (#21979)
b6d25037102 is described below

commit b6d2503710274b809aba58d943c5efa89df41207
Author: Sanskar Jhajharia <[email protected]>
AuthorDate: Tue Apr 21 16:58:24 2026 +0530

    KAFKA-20410: Add DLQ configuration parameters for Share Groups (KIP-1191) 
(#21979)
    
    ## Summary
    This PR adds the configuration foundation for Share Groups Dead-Letter
    Queue (DLQ) functionality as specified in [KIP-1191: Dead-letter queues
    for share
    
    
groups](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1191%3A+Dead-letter+queues+for+share+groups).
    
    ## Changes
    
    ### New Configurations
    **Cluster-level configs (GroupCoordinatorConfig.java):**
    1. `errors.deadletterqueue.auto.create.topics.enable` (default: `false`)
         - Enables automatic creation of DLQ topics
    2. `errors.deadletterqueue.topic.name.prefix` (default: `"dlq."`)
         - Required prefix for DLQ topic names when auto-create is enabled
    
    **Group-level configs (GroupConfig.java):**
    1. `errors.deadletterqueue.topic.name` (default: `""`)
         - Specifies the DLQ topic name for a share group
         - Empty string means DLQ disabled for that group
    2. `errors.deadletterqueue.copy.record.enable` (default: `false`)
         - When `true`: Copy full original record to DLQ
         - When `false`: Copy only context metadata (offset, delivery count,
    reason)
    
    **Topic-level config (TopicConfig.java):**
    1. `errors.deadletterqueue.group.enable` (default: not set)
         - Marks a topic as eligible for use as a DLQ
    
    ### Validation Logic
    - DLQ topic names **must not** start with `__` (reserved for internal
    topics)
    
    Reviewers: David Jacot <[email protected]>, Andrew Schofield
     <[email protected]>
---
 .../apache/kafka/common/config/TopicConfig.java    |  6 +++
 .../scala/unit/kafka/server/KafkaApisTest.scala    |  4 +-
 .../scala/unit/kafka/server/KafkaConfigTest.scala  |  7 ++-
 .../kafka/coordinator/group/GroupConfig.java       | 56 +++++++++++++++++++-
 .../coordinator/group/GroupCoordinatorConfig.java  | 35 +++++++++++++
 .../kafka/coordinator/group/GroupConfigTest.java   | 60 +++++++++++++++++++++-
 .../group/GroupCoordinatorConfigTest.java          | 34 ++++++++++++
 7 files changed, 196 insertions(+), 6 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java 
b/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
index e97c39bc619..5e63163f91c 100755
--- a/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
@@ -235,4 +235,10 @@ public class TopicConfig {
     @Deprecated
     public static final String MESSAGE_DOWNCONVERSION_ENABLE_DOC = 
"Down-conversion is not possible in Apache Kafka 4.0 and newer, " +
         "hence this configuration is no-op and it is deprecated for removal in 
Apache Kafka 5.0.";
+
+    // Dead Letter Queue Configuration (KIP-1191)
+    public static final String ERRORS_DEADLETTERQUEUE_GROUP_ENABLE_CONFIG = 
"errors.deadletterqueue.group.enable";
+    public static final String ERRORS_DEADLETTERQUEUE_GROUP_ENABLE_DOC = 
"Enable this topic to be used as a dead-letter queue for share groups. " +
+        "When set to <code>true</code>, share groups can write undeliverable 
records to this topic. When set to <code>false</code> (the default), " +
+        "attempts to use this topic as a DLQ will be rejected.";
 }
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 3237abade89..3fdcdaf3e49 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -77,7 +77,7 @@ import org.apache.kafka.common.security.auth.{KafkaPrincipal, 
KafkaPrincipalSerd
 import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource
 import org.apache.kafka.common.utils.{ProducerIdAndEpoch, SecurityUtils, Utils}
 import org.apache.kafka.common.utils.internals.ImplicitLinkedHashCollection
-import 
org.apache.kafka.coordinator.group.GroupConfig.{CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG,
 CONSUMER_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, 
CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, CONSUMER_SESSION_TIMEOUT_MS_CONFIG, 
SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG, SHARE_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, 
SHARE_AUTO_OFFSET_RESET_CONFIG, SHARE_DELIVERY_COUNT_LIMIT_CONFIG, 
SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_ISOLATION_LEVEL_CONFIG, 
SHARE_PARTITION_MAX_RECORD_LOCKS_CONFIG, SHARE_RECORD_LOCK_DURATION_MS_CO [...]
+import 
org.apache.kafka.coordinator.group.GroupConfig.{CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG,
 CONSUMER_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, 
CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, CONSUMER_SESSION_TIMEOUT_MS_CONFIG, 
SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG, SHARE_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, 
SHARE_AUTO_OFFSET_RESET_CONFIG, SHARE_DELIVERY_COUNT_LIMIT_CONFIG, 
SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_ISOLATION_LEVEL_CONFIG, 
SHARE_PARTITION_MAX_RECORD_LOCKS_CONFIG, SHARE_RECORD_LOCK_DURATION_MS_CO [...]
 import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig
 import org.apache.kafka.coordinator.group.{GroupConfig, GroupConfigManager, 
GroupCoordinator, GroupCoordinatorConfig}
 import org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult
@@ -379,6 +379,8 @@ class KafkaApisTest extends Logging {
     cgConfigs.put(STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG, "1000")
     cgConfigs.put(STREAMS_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, "false");
     cgConfigs.put(STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG, 
GroupCoordinatorConfig.STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_DEFAULT.toString)
+    cgConfigs.put(ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG, "")
+    cgConfigs.put(ERRORS_DEADLETTERQUEUE_COPY_RECORD_ENABLE_CONFIG, "false")
 
     when(configRepository.groupConfig(consumerGroupId)).thenReturn(cgConfigs)
 
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 90dffba4b48..3c7f2ee1368 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -30,9 +30,8 @@ import 
org.apache.kafka.common.record.internal.{CompressionType, Records}
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
 import org.apache.kafka.common.utils.LogCaptureAppender
-import org.apache.kafka.coordinator.group.ConsumerGroupMigrationPolicy
+import org.apache.kafka.coordinator.group.{ConsumerGroupMigrationPolicy, 
GroupConfig, GroupCoordinatorConfig}
 import org.apache.kafka.coordinator.group.Group.GroupType
-import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
 import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig
 import org.apache.kafka.coordinator.share.ShareCoordinatorConfig
 import org.apache.kafka.coordinator.transaction.{TransactionLogConfig, 
TransactionStateManagerConfig}
@@ -1071,6 +1070,10 @@ class KafkaConfigTest {
         case GroupCoordinatorConfig.SHARE_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG 
=> assertPropertyInvalid(baseProperties, name, "not_a_number", -1)
         case 
GroupCoordinatorConfig.SHARE_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG => 
assertPropertyInvalid(baseProperties, name, "not_a_number", -1)
         case 
GroupCoordinatorConfig.SHARE_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG => 
assertPropertyInvalid(baseProperties, name, "not_a_number", -1)
+        case 
GroupCoordinatorConfig.ERRORS_DEADLETTERQUEUE_AUTO_CREATE_TOPICS_ENABLE_CONFIG 
=> assertPropertyInvalid(baseProperties, name, "not_a_boolean")
+        case 
GroupCoordinatorConfig.ERRORS_DEADLETTERQUEUE_TOPIC_NAME_PREFIX_CONFIG => // 
ignore string
+        case GroupConfig.ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG => // ignore 
string
+        case GroupConfig.ERRORS_DEADLETTERQUEUE_COPY_RECORD_ENABLE_CONFIG => 
assertPropertyInvalid(baseProperties, name, "not_a_boolean")
 
         /** Streams groups configs */
         case GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG => 
assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
index 07247bf76ab..0404b299bf9 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
@@ -105,6 +105,14 @@ public final class GroupConfig extends AbstractConfig {
 
     public static final String STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG = 
"streams.task.offset.interval.ms";
 
+    public static final String ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG = 
"errors.deadletterqueue.topic.name";
+    public static final String ERRORS_DEADLETTERQUEUE_TOPIC_NAME_DEFAULT = "";
+    public static final String ERRORS_DEADLETTERQUEUE_TOPIC_NAME_DOC = "The 
name of the topic to be used as the dead-letter queue (DLQ) topic for this 
share group. If blank (the default), the group does not have a DLQ topic.";
+
+    public static final String 
ERRORS_DEADLETTERQUEUE_COPY_RECORD_ENABLE_CONFIG = 
"errors.deadletterqueue.copy.record.enable";
+    public static final boolean 
ERRORS_DEADLETTERQUEUE_COPY_RECORD_ENABLE_DEFAULT = false;
+    public static final String ERRORS_DEADLETTERQUEUE_COPY_RECORD_ENABLE_DOC = 
"When writing onto the dead-letter queue topic, whether to copy the original 
record onto the DLQ topic, or just write a record containing the context 
information headers.";
+
     private final Optional<Integer> consumerSessionTimeoutMs;
 
     private final Optional<Integer> consumerHeartbeatIntervalMs;
@@ -147,6 +155,10 @@ public final class GroupConfig extends AbstractConfig {
 
     private final Optional<Boolean> shareRenewAcknowledgeEnable;
 
+    public final String errorsDLQTopicName;
+
+    public final boolean errorsDLQCopyRecordEnable;
+
     public static final ConfigDef CONFIG_DEF = new ConfigDef()
         .define(CONSUMER_SESSION_TIMEOUT_MS_CONFIG,
             INT,
@@ -269,7 +281,19 @@ public final class GroupConfig extends AbstractConfig {
             
GroupCoordinatorConfig.STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_DEFAULT,
             atLeast(1),
             MEDIUM,
-            GroupCoordinatorConfig.STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_DOC);
+            GroupCoordinatorConfig.STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_DOC)
+
+        // DLQ configurations (KIP-1191)
+        .define(ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG,
+            STRING,
+            ERRORS_DEADLETTERQUEUE_TOPIC_NAME_DEFAULT,
+            MEDIUM,
+            ERRORS_DEADLETTERQUEUE_TOPIC_NAME_DOC)
+        .define(ERRORS_DEADLETTERQUEUE_COPY_RECORD_ENABLE_CONFIG,
+            BOOLEAN,
+            ERRORS_DEADLETTERQUEUE_COPY_RECORD_ENABLE_DEFAULT,
+            MEDIUM,
+            ERRORS_DEADLETTERQUEUE_COPY_RECORD_ENABLE_DOC);
 
     /**
      * Mapping from GroupConfig name to its broker-level synonym config name.
@@ -301,7 +325,11 @@ public final class GroupConfig extends AbstractConfig {
         Map.entry(STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG, 
Optional.of(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG)),
         Map.entry(STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG, 
Optional.of(GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG)),
         Map.entry(STREAMS_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, 
Optional.of(GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG)),
-        Map.entry(STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG, 
Optional.of(GroupCoordinatorConfig.STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_CONFIG))
+        Map.entry(STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG, 
Optional.of(GroupCoordinatorConfig.STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_CONFIG)),
+
+        // DLQ configs
+        Map.entry(ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG, Optional.empty()),
+        Map.entry(ERRORS_DEADLETTERQUEUE_COPY_RECORD_ENABLE_CONFIG, 
Optional.empty())
     );
 
     /**
@@ -337,6 +365,8 @@ public final class GroupConfig extends AbstractConfig {
         this.shareIsolationLevel = optionalString(SHARE_ISOLATION_LEVEL_CONFIG)
             .map(s -> IsolationLevel.valueOf(s.toUpperCase(Locale.ROOT)));
         this.shareRenewAcknowledgeEnable = 
optionalBoolean(SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG);
+        this.errorsDLQTopicName = 
getString(ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG);
+        this.errorsDLQCopyRecordEnable = 
getBoolean(ERRORS_DEADLETTERQUEUE_COPY_RECORD_ENABLE_CONFIG);
     }
 
     private Optional<Integer> optionalInt(String key) {
@@ -535,6 +565,14 @@ public final class GroupConfig extends AbstractConfig {
             STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG,
             groupCoordinatorConfig.streamsGroupHeartbeatIntervalMs()
         );
+
+        // DLQ validation (KIP-1191)
+        // DLQ topic name must not start with "__" (reserved for internal 
topics)
+        String dlqTopicName = (String) 
parsed.get(ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG);
+        if (dlqTopicName != null && !dlqTopicName.isEmpty() && 
dlqTopicName.startsWith("__")) {
+            throw new 
InvalidConfigurationException(ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG +
+                ": DLQ topic name must not start with '__'");
+        }
     }
 
     /**
@@ -1060,6 +1098,20 @@ public final class GroupConfig extends AbstractConfig {
         return shareRenewAcknowledgeEnable;
     }
 
+    /**
+     * The DLQ topic name for this group.
+     */
+    public String errorsDLQTopicName() {
+        return errorsDLQTopicName;
+    }
+
+    /**
+     * Whether to copy the original record to the DLQ topic.
+     */
+    public boolean errorsDLQCopyRecordEnable() {
+        return errorsDLQCopyRecordEnable;
+    }
+
     public static void main(String[] args) {
         System.out.println(CONFIG_DEF.toHtml(4, config -> "groupconfigs_" + 
config));
     }
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
index 6d04a812ab4..30e79e4631c 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
@@ -314,6 +314,17 @@ public class GroupCoordinatorConfig {
     public static final String SHARE_GROUP_INITIALIZE_RETRY_INTERVAL_MS_DOC = 
"Time elapsed before retrying initialize share group state request. " +
         "If below offsets.commit.timeout.ms, then value of 
offsets.commit.timeout.ms is used.";
 
+    ///
+    /// DLQ configs (KIP-1191)
+    ///
+    public static final String 
ERRORS_DEADLETTERQUEUE_AUTO_CREATE_TOPICS_ENABLE_CONFIG = 
"errors.deadletterqueue.auto.create.topics.enable";
+    public static final boolean 
ERRORS_DEADLETTERQUEUE_AUTO_CREATE_TOPICS_ENABLE_DEFAULT = false;
+    public static final String 
ERRORS_DEADLETTERQUEUE_AUTO_CREATE_TOPICS_ENABLE_DOC = "Whether automatic 
creation of DLQ topics is enabled (KIP-1191). When a share group has a DLQ 
topic configured, this setting controls whether the broker will automatically 
create the topic if it does not exist.";
+
+    public static final String ERRORS_DEADLETTERQUEUE_TOPIC_NAME_PREFIX_CONFIG 
= "errors.deadletterqueue.topic.name.prefix";
+    public static final String 
ERRORS_DEADLETTERQUEUE_TOPIC_NAME_PREFIX_DEFAULT = "dlq.";
+    public static final String ERRORS_DEADLETTERQUEUE_TOPIC_NAME_PREFIX_DOC = 
"The required prefix of topic names used by dead-letter queue topics for share 
groups. When set to \"\", there is no restriction on the names used for 
dead-letter queue topics.";
+
     ///
     /// Streams group configs
     ///
@@ -451,6 +462,10 @@ public class GroupCoordinatorConfig {
         .define(SHARE_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, BOOLEAN, 
SHARE_GROUP_ASSIGNOR_OFFLOAD_ENABLE_DEFAULT, MEDIUM, 
SHARE_GROUP_ASSIGNOR_OFFLOAD_ENABLE_DOC)
         .defineInternal(SHARE_GROUP_INITIALIZE_RETRY_INTERVAL_MS_CONFIG, INT, 
SHARE_GROUP_INITIALIZE_RETRY_INTERVAL_MS_DEFAULT, atLeast(1), LOW, 
SHARE_GROUP_INITIALIZE_RETRY_INTERVAL_MS_DOC)
 
+        // DLQ configs (KIP-1191)
+        .define(ERRORS_DEADLETTERQUEUE_AUTO_CREATE_TOPICS_ENABLE_CONFIG, 
BOOLEAN, ERRORS_DEADLETTERQUEUE_AUTO_CREATE_TOPICS_ENABLE_DEFAULT, MEDIUM, 
ERRORS_DEADLETTERQUEUE_AUTO_CREATE_TOPICS_ENABLE_DOC)
+        .define(ERRORS_DEADLETTERQUEUE_TOPIC_NAME_PREFIX_CONFIG, STRING, 
ERRORS_DEADLETTERQUEUE_TOPIC_NAME_PREFIX_DEFAULT, MEDIUM, 
ERRORS_DEADLETTERQUEUE_TOPIC_NAME_PREFIX_DOC)
+
         // Streams group configs
         .define(STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG, INT, 
STREAMS_GROUP_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, 
STREAMS_GROUP_SESSION_TIMEOUT_MS_DOC)
         .define(STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, INT, 
STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, 
STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_DOC)
@@ -515,6 +530,9 @@ public class GroupCoordinatorConfig {
     private final int shareGroupMinAssignmentIntervalMs;
     private final int shareGroupMaxAssignmentIntervalMs;
     private final int shareGroupInitializeRetryIntervalMs;
+    // DLQ configurations
+    private final boolean errorsDLQAutoCreateTopicsEnable;
+    private final String errorsDLQTopicNamePrefix;
     // Streams group configurations
     private final int streamsGroupSessionTimeoutMs;
     private final int streamsGroupMinSessionTimeoutMs;
@@ -579,6 +597,9 @@ public class GroupCoordinatorConfig {
         this.shareGroupMinAssignmentIntervalMs = 
config.getInt(GroupCoordinatorConfig.SHARE_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG);
         this.shareGroupMaxAssignmentIntervalMs = 
config.getInt(GroupCoordinatorConfig.SHARE_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG);
         this.shareGroupInitializeRetryIntervalMs = Math.max(initializeRetryMs, 
this.offsetCommitTimeoutMs);
+        // DLQ configurations
+        this.errorsDLQAutoCreateTopicsEnable = 
config.getBoolean(GroupCoordinatorConfig.ERRORS_DEADLETTERQUEUE_AUTO_CREATE_TOPICS_ENABLE_CONFIG);
+        this.errorsDLQTopicNamePrefix = 
config.getString(GroupCoordinatorConfig.ERRORS_DEADLETTERQUEUE_TOPIC_NAME_PREFIX_CONFIG);
         // Streams group configurations
         this.streamsGroupSessionTimeoutMs = 
config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG);
         this.streamsGroupMinSessionTimeoutMs = 
config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG);
@@ -1188,6 +1209,20 @@ public class GroupCoordinatorConfig {
         return shareGroupInitializeRetryIntervalMs;
     }
 
+    /**
+     * Whether automatic creation of DLQ topics is enabled.
+     */
+    public boolean errorsDLQAutoCreateTopicsEnable() {
+        return errorsDLQAutoCreateTopicsEnable;
+    }
+
+    /**
+     * The required prefix for DLQ topic names.
+     */
+    public String errorsDLQTopicNamePrefix() {
+        return errorsDLQTopicNamePrefix;
+    }
+
     /**
      * The streams group session timeout in milliseconds.
      */
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
index 96df728aa2c..a79ec3ac1fc 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
@@ -61,6 +61,7 @@ import static 
org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig.S
 import static 
org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig.SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_DEFAULT;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -126,7 +127,9 @@ public class GroupConfigTest {
                 assertPropertyInvalid(name, "not_a_boolean");
             } else if 
(GroupConfig.STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG.equals(name)) {
                 assertPropertyInvalid(name, "not_a_number", "1.0");
-            } else {
+            } else if 
(GroupConfig.ERRORS_DEADLETTERQUEUE_COPY_RECORD_ENABLE_CONFIG.equals(name)) {
+                assertPropertyInvalid(name, "not_a_boolean");
+            } else if 
(!GroupConfig.ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG.equals(name)) {
                 assertPropertyInvalid(name, "not_a_number", "-0.1");
             }
         });
@@ -425,6 +428,10 @@ public class GroupConfigTest {
         assertEquals(Optional.empty(), config.streamsAssignmentIntervalMs());
         assertEquals(Optional.empty(), config.streamsAssignorOffloadEnable());
         assertEquals(Optional.empty(), config.streamsTaskOffsetIntervalMs());
+
+        // DLQ configs - have defaults from CONFIG_DEF
+        assertEquals("", config.errorsDLQTopicName());
+        assertFalse(config.errorsDLQCopyRecordEnable());
     }
 
     @Test
@@ -452,6 +459,8 @@ public class GroupConfigTest {
         props.put(GroupConfig.STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG, "1250");
         props.put(GroupConfig.STREAMS_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, "false");
         props.put(GroupConfig.STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG, "30000");
+        props.put(GroupConfig.ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG, 
"my-dlq-topic");
+        
props.put(GroupConfig.ERRORS_DEADLETTERQUEUE_COPY_RECORD_ENABLE_CONFIG, "true");
 
         GroupConfig config = new GroupConfig(props);
 
@@ -481,6 +490,10 @@ public class GroupConfigTest {
         assertEquals(Optional.of(1250), config.streamsAssignmentIntervalMs());
         assertEquals(Optional.of(false), 
config.streamsAssignorOffloadEnable());
         assertEquals(Optional.of(30000), config.streamsTaskOffsetIntervalMs());
+
+        // DLQ configs
+        assertEquals("my-dlq-topic", config.errorsDLQTopicName());
+        assertTrue(config.errorsDLQCopyRecordEnable());
     }
 
     @Test
@@ -776,4 +789,49 @@ public class GroupConfigTest {
                 SHARE_GROUP_DELIVERY_COUNT_LIMIT, 
SHARE_GROUP_MIN_DELIVERY_COUNT_LIMIT, SHARE_GROUP_MAX_DELIVERY_COUNT_LIMIT,
                 SHARE_GROUP_RECORD_LOCK_DURATION_MS, 
SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS, 
SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS);
     }
+
+    @Test
+    public void testDLQConfigDefaults() {
+        // Test default DLQ configuration values (KIP-1191)
+        Map<String, String> configs = new HashMap<>();
+        GroupConfig config = new GroupConfig(configs);
+
+        assertEquals("", config.errorsDLQTopicName());
+        assertFalse(config.errorsDLQCopyRecordEnable());
+    }
+
+    @Test
+    public void testDLQConfigCustomValues() {
+        // Test custom DLQ configuration values
+        Map<String, String> configs = new HashMap<>();
+        configs.put(GroupConfig.ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG, 
"my-dlq-topic");
+        
configs.put(GroupConfig.ERRORS_DEADLETTERQUEUE_COPY_RECORD_ENABLE_CONFIG, 
"true");
+
+        GroupConfig config = new GroupConfig(configs);
+
+        assertEquals("my-dlq-topic", config.errorsDLQTopicName());
+        assertTrue(config.errorsDLQCopyRecordEnable());
+    }
+
+    @Test
+    public void testDLQTopicNameCannotStartWithDoubleUnderscore() {
+        // DLQ topic name must not start with "__" (reserved for internal 
topics)
+        Map<String, String> configs = new HashMap<>();
+        configs.put(GroupConfig.ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG, 
"__my-dlq");
+
+        InvalidConfigurationException exception = 
assertThrows(InvalidConfigurationException.class, () ->
+            GroupConfig.validate(configs, createGroupCoordinatorConfig(), 
createShareGroupConfig()));
+        assertTrue(exception.getMessage().contains("DLQ topic name must not 
start with '__'"));
+    }
+
+    @Test
+    public void testDLQBlankTopicNameIsValid() {
+        // Blank DLQ topic name is valid (means DLQ is disabled for that group)
+        Map<String, String> configs = new HashMap<>();
+        configs.put(GroupConfig.ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG, "");
+
+        GroupConfig config = new GroupConfig(configs);
+
+        assertEquals("", config.errorsDLQTopicName());
+    }
 }
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
index a1eff74238f..f9dc304e16d 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
@@ -724,6 +724,40 @@ public class GroupCoordinatorConfigTest {
         assertEquals(20000, config.streamsGroupMinTaskOffsetIntervalMs());
     }
 
+    @Test
+    public void testDLQAutoCreateTopicsEnableDefaultValue() {
+        Map<String, Object> configs = new HashMap<>();
+        GroupCoordinatorConfig config = createConfig(configs);
+        assertEquals(false, config.errorsDLQAutoCreateTopicsEnable());
+        
assertEquals(GroupCoordinatorConfig.ERRORS_DEADLETTERQUEUE_AUTO_CREATE_TOPICS_ENABLE_DEFAULT,
+            config.errorsDLQAutoCreateTopicsEnable());
+    }
+
+    @Test
+    public void testDLQAutoCreateTopicsEnableCustomValue() {
+        Map<String, Object> configs = new HashMap<>();
+        
configs.put(GroupCoordinatorConfig.ERRORS_DEADLETTERQUEUE_AUTO_CREATE_TOPICS_ENABLE_CONFIG,
 true);
+        GroupCoordinatorConfig config = createConfig(configs);
+        assertEquals(true, config.errorsDLQAutoCreateTopicsEnable());
+    }
+
+    @Test
+    public void testDLQTopicNamePrefixDefaultValue() {
+        Map<String, Object> configs = new HashMap<>();
+        GroupCoordinatorConfig config = createConfig(configs);
+        assertEquals("dlq.", config.errorsDLQTopicNamePrefix());
+        
assertEquals(GroupCoordinatorConfig.ERRORS_DEADLETTERQUEUE_TOPIC_NAME_PREFIX_DEFAULT,
+            config.errorsDLQTopicNamePrefix());
+    }
+
+    @Test
+    public void testDLQTopicNamePrefixCustomValue() {
+        Map<String, Object> configs = new HashMap<>();
+        
configs.put(GroupCoordinatorConfig.ERRORS_DEADLETTERQUEUE_TOPIC_NAME_PREFIX_CONFIG,
 "my-dlq-");
+        GroupCoordinatorConfig config = createConfig(configs);
+        assertEquals("my-dlq-", config.errorsDLQTopicNamePrefix());
+    }
+
     public static GroupCoordinatorConfig createConfig(Map<String, Object> 
configs) {
         return new GroupCoordinatorConfig(new AbstractConfig(
             GroupCoordinatorConfig.CONFIG_DEF,

Reply via email to