This is an automated email from the ASF dual-hosted git repository.
AndrewJSchofield pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b6d25037102 KAFKA-20410: Add DLQ configuration parameters for Share
Groups (KIP-1191) (#21979)
b6d25037102 is described below
commit b6d2503710274b809aba58d943c5efa89df41207
Author: Sanskar Jhajharia <[email protected]>
AuthorDate: Tue Apr 21 16:58:24 2026 +0530
KAFKA-20410: Add DLQ configuration parameters for Share Groups (KIP-1191)
(#21979)
## Summary
This PR adds the configuration foundation for Share Groups Dead-Letter
Queue (DLQ) functionality as specified in [KIP-1191: Dead-letter queues
for share
groups](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1191%3A+Dead-letter+queues+for+share+groups).
## Changes
### New Configurations
**Cluster-level configs (GroupCoordinatorConfig.java):**
1. `errors.deadletterqueue.auto.create.topics.enable` (default: `false`)
- Enables automatic creation of DLQ topics
2. `errors.deadletterqueue.topic.name.prefix` (default: `"dlq."`)
- Required prefix for DLQ topic names when auto-create is enabled
**Group-level configs (GroupConfig.java):**
1. `errors.deadletterqueue.topic.name` (default: `""`)
- Specifies the DLQ topic name for a share group
- Empty string means DLQ disabled for that group
2. `errors.deadletterqueue.copy.record.enable` (default: `false`)
- When `true`: Copy full original record to DLQ
- When `false`: Copy only context metadata (offset, delivery count,
reason)
**Topic-level config (TopicConfig.java):**
1. `errors.deadletterqueue.group.enable` (default: not set)
- Marks a topic as eligible for use as a DLQ
### Validation Logic
- DLQ topic names **must not** start with `__` (reserved for internal
topics)
Reviewers: David Jacot <[email protected]>, Andrew Schofield
<[email protected]>
---
.../apache/kafka/common/config/TopicConfig.java | 6 +++
.../scala/unit/kafka/server/KafkaApisTest.scala | 4 +-
.../scala/unit/kafka/server/KafkaConfigTest.scala | 7 ++-
.../kafka/coordinator/group/GroupConfig.java | 56 +++++++++++++++++++-
.../coordinator/group/GroupCoordinatorConfig.java | 35 +++++++++++++
.../kafka/coordinator/group/GroupConfigTest.java | 60 +++++++++++++++++++++-
.../group/GroupCoordinatorConfigTest.java | 34 ++++++++++++
7 files changed, 196 insertions(+), 6 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
b/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
index e97c39bc619..5e63163f91c 100755
--- a/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
@@ -235,4 +235,10 @@ public class TopicConfig {
@Deprecated
public static final String MESSAGE_DOWNCONVERSION_ENABLE_DOC =
"Down-conversion is not possible in Apache Kafka 4.0 and newer, " +
"hence this configuration is no-op and it is deprecated for removal in
Apache Kafka 5.0.";
+
+ // Dead Letter Queue Configuration (KIP-1191)
+ public static final String ERRORS_DEADLETTERQUEUE_GROUP_ENABLE_CONFIG =
"errors.deadletterqueue.group.enable";
+ public static final String ERRORS_DEADLETTERQUEUE_GROUP_ENABLE_DOC =
"Enable this topic to be used as a dead-letter queue for share groups. " +
+ "When set to <code>true</code>, share groups can write undeliverable
records to this topic. When set to <code>false</code> (the default), " +
+ "attempts to use this topic as a DLQ will be rejected.";
}
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 3237abade89..3fdcdaf3e49 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -77,7 +77,7 @@ import org.apache.kafka.common.security.auth.{KafkaPrincipal,
KafkaPrincipalSerd
import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource
import org.apache.kafka.common.utils.{ProducerIdAndEpoch, SecurityUtils, Utils}
import org.apache.kafka.common.utils.internals.ImplicitLinkedHashCollection
-import
org.apache.kafka.coordinator.group.GroupConfig.{CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG,
CONSUMER_ASSIGNOR_OFFLOAD_ENABLE_CONFIG,
CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, CONSUMER_SESSION_TIMEOUT_MS_CONFIG,
SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG, SHARE_ASSIGNOR_OFFLOAD_ENABLE_CONFIG,
SHARE_AUTO_OFFSET_RESET_CONFIG, SHARE_DELIVERY_COUNT_LIMIT_CONFIG,
SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_ISOLATION_LEVEL_CONFIG,
SHARE_PARTITION_MAX_RECORD_LOCKS_CONFIG, SHARE_RECORD_LOCK_DURATION_MS_CO [...]
+import
org.apache.kafka.coordinator.group.GroupConfig.{CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG,
CONSUMER_ASSIGNOR_OFFLOAD_ENABLE_CONFIG,
CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, CONSUMER_SESSION_TIMEOUT_MS_CONFIG,
SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG, SHARE_ASSIGNOR_OFFLOAD_ENABLE_CONFIG,
SHARE_AUTO_OFFSET_RESET_CONFIG, SHARE_DELIVERY_COUNT_LIMIT_CONFIG,
SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_ISOLATION_LEVEL_CONFIG,
SHARE_PARTITION_MAX_RECORD_LOCKS_CONFIG, SHARE_RECORD_LOCK_DURATION_MS_CO [...]
import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig
import org.apache.kafka.coordinator.group.{GroupConfig, GroupConfigManager,
GroupCoordinator, GroupCoordinatorConfig}
import org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult
@@ -379,6 +379,8 @@ class KafkaApisTest extends Logging {
cgConfigs.put(STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG, "1000")
cgConfigs.put(STREAMS_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, "false");
cgConfigs.put(STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG,
GroupCoordinatorConfig.STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_DEFAULT.toString)
+ cgConfigs.put(ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG, "")
+ cgConfigs.put(ERRORS_DEADLETTERQUEUE_COPY_RECORD_ENABLE_CONFIG, "false")
when(configRepository.groupConfig(consumerGroupId)).thenReturn(cgConfigs)
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 90dffba4b48..3c7f2ee1368 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -30,9 +30,8 @@ import
org.apache.kafka.common.record.internal.{CompressionType, Records}
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
import org.apache.kafka.common.utils.LogCaptureAppender
-import org.apache.kafka.coordinator.group.ConsumerGroupMigrationPolicy
+import org.apache.kafka.coordinator.group.{ConsumerGroupMigrationPolicy,
GroupConfig, GroupCoordinatorConfig}
import org.apache.kafka.coordinator.group.Group.GroupType
-import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig
import org.apache.kafka.coordinator.share.ShareCoordinatorConfig
import org.apache.kafka.coordinator.transaction.{TransactionLogConfig,
TransactionStateManagerConfig}
@@ -1071,6 +1070,10 @@ class KafkaConfigTest {
case GroupCoordinatorConfig.SHARE_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG
=> assertPropertyInvalid(baseProperties, name, "not_a_number", -1)
case
GroupCoordinatorConfig.SHARE_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG =>
assertPropertyInvalid(baseProperties, name, "not_a_number", -1)
case
GroupCoordinatorConfig.SHARE_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG =>
assertPropertyInvalid(baseProperties, name, "not_a_number", -1)
+ case
GroupCoordinatorConfig.ERRORS_DEADLETTERQUEUE_AUTO_CREATE_TOPICS_ENABLE_CONFIG
=> assertPropertyInvalid(baseProperties, name, "not_a_boolean")
+ case
GroupCoordinatorConfig.ERRORS_DEADLETTERQUEUE_TOPIC_NAME_PREFIX_CONFIG => //
ignore string
+ case GroupConfig.ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG => // ignore
string
+ case GroupConfig.ERRORS_DEADLETTERQUEUE_COPY_RECORD_ENABLE_CONFIG =>
assertPropertyInvalid(baseProperties, name, "not_a_boolean")
/** Streams groups configs */
case GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG =>
assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
index 07247bf76ab..0404b299bf9 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
@@ -105,6 +105,14 @@ public final class GroupConfig extends AbstractConfig {
public static final String STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG =
"streams.task.offset.interval.ms";
+ public static final String ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG =
"errors.deadletterqueue.topic.name";
+ public static final String ERRORS_DEADLETTERQUEUE_TOPIC_NAME_DEFAULT = "";
+ public static final String ERRORS_DEADLETTERQUEUE_TOPIC_NAME_DOC = "The
name of the topic to be used as the dead-letter queue (DLQ) topic for this
share group. If blank (the default), the group does not have a DLQ topic.";
+
+ public static final String
ERRORS_DEADLETTERQUEUE_COPY_RECORD_ENABLE_CONFIG =
"errors.deadletterqueue.copy.record.enable";
+ public static final boolean
ERRORS_DEADLETTERQUEUE_COPY_RECORD_ENABLE_DEFAULT = false;
+ public static final String ERRORS_DEADLETTERQUEUE_COPY_RECORD_ENABLE_DOC =
"When writing onto the dead-letter queue topic, whether to copy the original
record onto the DLQ topic, or just write a record containing the context
information headers.";
+
private final Optional<Integer> consumerSessionTimeoutMs;
private final Optional<Integer> consumerHeartbeatIntervalMs;
@@ -147,6 +155,10 @@ public final class GroupConfig extends AbstractConfig {
private final Optional<Boolean> shareRenewAcknowledgeEnable;
+ public final String errorsDLQTopicName;
+
+ public final boolean errorsDLQCopyRecordEnable;
+
public static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(CONSUMER_SESSION_TIMEOUT_MS_CONFIG,
INT,
@@ -269,7 +281,19 @@ public final class GroupConfig extends AbstractConfig {
GroupCoordinatorConfig.STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_DEFAULT,
atLeast(1),
MEDIUM,
- GroupCoordinatorConfig.STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_DOC);
+ GroupCoordinatorConfig.STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_DOC)
+
+ // DLQ configurations (KIP-1191)
+ .define(ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG,
+ STRING,
+ ERRORS_DEADLETTERQUEUE_TOPIC_NAME_DEFAULT,
+ MEDIUM,
+ ERRORS_DEADLETTERQUEUE_TOPIC_NAME_DOC)
+ .define(ERRORS_DEADLETTERQUEUE_COPY_RECORD_ENABLE_CONFIG,
+ BOOLEAN,
+ ERRORS_DEADLETTERQUEUE_COPY_RECORD_ENABLE_DEFAULT,
+ MEDIUM,
+ ERRORS_DEADLETTERQUEUE_COPY_RECORD_ENABLE_DOC);
/**
* Mapping from GroupConfig name to its broker-level synonym config name.
@@ -301,7 +325,11 @@ public final class GroupConfig extends AbstractConfig {
Map.entry(STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG,
Optional.of(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG)),
Map.entry(STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG,
Optional.of(GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG)),
Map.entry(STREAMS_ASSIGNOR_OFFLOAD_ENABLE_CONFIG,
Optional.of(GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG)),
- Map.entry(STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG,
Optional.of(GroupCoordinatorConfig.STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_CONFIG))
+ Map.entry(STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG,
Optional.of(GroupCoordinatorConfig.STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_CONFIG)),
+
+ // DLQ configs
+ Map.entry(ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG, Optional.empty()),
+ Map.entry(ERRORS_DEADLETTERQUEUE_COPY_RECORD_ENABLE_CONFIG,
Optional.empty())
);
/**
@@ -337,6 +365,8 @@ public final class GroupConfig extends AbstractConfig {
this.shareIsolationLevel = optionalString(SHARE_ISOLATION_LEVEL_CONFIG)
.map(s -> IsolationLevel.valueOf(s.toUpperCase(Locale.ROOT)));
this.shareRenewAcknowledgeEnable =
optionalBoolean(SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG);
+ this.errorsDLQTopicName =
getString(ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG);
+ this.errorsDLQCopyRecordEnable =
getBoolean(ERRORS_DEADLETTERQUEUE_COPY_RECORD_ENABLE_CONFIG);
}
private Optional<Integer> optionalInt(String key) {
@@ -535,6 +565,14 @@ public final class GroupConfig extends AbstractConfig {
STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG,
groupCoordinatorConfig.streamsGroupHeartbeatIntervalMs()
);
+
+ // DLQ validation (KIP-1191)
+ // DLQ topic name must not start with "__" (reserved for internal
topics)
+ String dlqTopicName = (String)
parsed.get(ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG);
+ if (dlqTopicName != null && !dlqTopicName.isEmpty() &&
dlqTopicName.startsWith("__")) {
+ throw new
InvalidConfigurationException(ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG +
+ ": DLQ topic name must not start with '__'");
+ }
}
/**
@@ -1060,6 +1098,20 @@ public final class GroupConfig extends AbstractConfig {
return shareRenewAcknowledgeEnable;
}
+ /**
+ * The DLQ topic name for this group.
+ */
+ public String errorsDLQTopicName() {
+ return errorsDLQTopicName;
+ }
+
+ /**
+ * Whether to copy the original record to the DLQ topic.
+ */
+ public boolean errorsDLQCopyRecordEnable() {
+ return errorsDLQCopyRecordEnable;
+ }
+
public static void main(String[] args) {
System.out.println(CONFIG_DEF.toHtml(4, config -> "groupconfigs_" +
config));
}
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
index 6d04a812ab4..30e79e4631c 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
@@ -314,6 +314,17 @@ public class GroupCoordinatorConfig {
public static final String SHARE_GROUP_INITIALIZE_RETRY_INTERVAL_MS_DOC =
"Time elapsed before retrying initialize share group state request. " +
"If below offsets.commit.timeout.ms, then value of
offsets.commit.timeout.ms is used.";
+ ///
+ /// DLQ configs (KIP-1191)
+ ///
+ public static final String
ERRORS_DEADLETTERQUEUE_AUTO_CREATE_TOPICS_ENABLE_CONFIG =
"errors.deadletterqueue.auto.create.topics.enable";
+ public static final boolean
ERRORS_DEADLETTERQUEUE_AUTO_CREATE_TOPICS_ENABLE_DEFAULT = false;
+ public static final String
ERRORS_DEADLETTERQUEUE_AUTO_CREATE_TOPICS_ENABLE_DOC = "Whether automatic
creation of DLQ topics is enabled (KIP-1191). When a share group has a DLQ
topic configured, this setting controls whether the broker will automatically
create the topic if it does not exist.";
+
+ public static final String ERRORS_DEADLETTERQUEUE_TOPIC_NAME_PREFIX_CONFIG
= "errors.deadletterqueue.topic.name.prefix";
+ public static final String
ERRORS_DEADLETTERQUEUE_TOPIC_NAME_PREFIX_DEFAULT = "dlq.";
+ public static final String ERRORS_DEADLETTERQUEUE_TOPIC_NAME_PREFIX_DOC =
"The required prefix of topic names used by dead-letter queue topics for share
groups. When set to \"\", there is no restriction on the names used for
dead-letter queue topics.";
+
///
/// Streams group configs
///
@@ -451,6 +462,10 @@ public class GroupCoordinatorConfig {
.define(SHARE_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, BOOLEAN,
SHARE_GROUP_ASSIGNOR_OFFLOAD_ENABLE_DEFAULT, MEDIUM,
SHARE_GROUP_ASSIGNOR_OFFLOAD_ENABLE_DOC)
.defineInternal(SHARE_GROUP_INITIALIZE_RETRY_INTERVAL_MS_CONFIG, INT,
SHARE_GROUP_INITIALIZE_RETRY_INTERVAL_MS_DEFAULT, atLeast(1), LOW,
SHARE_GROUP_INITIALIZE_RETRY_INTERVAL_MS_DOC)
+ // DLQ configs (KIP-1191)
+ .define(ERRORS_DEADLETTERQUEUE_AUTO_CREATE_TOPICS_ENABLE_CONFIG,
BOOLEAN, ERRORS_DEADLETTERQUEUE_AUTO_CREATE_TOPICS_ENABLE_DEFAULT, MEDIUM,
ERRORS_DEADLETTERQUEUE_AUTO_CREATE_TOPICS_ENABLE_DOC)
+ .define(ERRORS_DEADLETTERQUEUE_TOPIC_NAME_PREFIX_CONFIG, STRING,
ERRORS_DEADLETTERQUEUE_TOPIC_NAME_PREFIX_DEFAULT, MEDIUM,
ERRORS_DEADLETTERQUEUE_TOPIC_NAME_PREFIX_DOC)
+
// Streams group configs
.define(STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG, INT,
STREAMS_GROUP_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM,
STREAMS_GROUP_SESSION_TIMEOUT_MS_DOC)
.define(STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, INT,
STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM,
STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_DOC)
@@ -515,6 +530,9 @@ public class GroupCoordinatorConfig {
private final int shareGroupMinAssignmentIntervalMs;
private final int shareGroupMaxAssignmentIntervalMs;
private final int shareGroupInitializeRetryIntervalMs;
+ // DLQ configurations
+ private final boolean errorsDLQAutoCreateTopicsEnable;
+ private final String errorsDLQTopicNamePrefix;
// Streams group configurations
private final int streamsGroupSessionTimeoutMs;
private final int streamsGroupMinSessionTimeoutMs;
@@ -579,6 +597,9 @@ public class GroupCoordinatorConfig {
this.shareGroupMinAssignmentIntervalMs =
config.getInt(GroupCoordinatorConfig.SHARE_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG);
this.shareGroupMaxAssignmentIntervalMs =
config.getInt(GroupCoordinatorConfig.SHARE_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG);
this.shareGroupInitializeRetryIntervalMs = Math.max(initializeRetryMs,
this.offsetCommitTimeoutMs);
+ // DLQ configurations
+ this.errorsDLQAutoCreateTopicsEnable =
config.getBoolean(GroupCoordinatorConfig.ERRORS_DEADLETTERQUEUE_AUTO_CREATE_TOPICS_ENABLE_CONFIG);
+ this.errorsDLQTopicNamePrefix =
config.getString(GroupCoordinatorConfig.ERRORS_DEADLETTERQUEUE_TOPIC_NAME_PREFIX_CONFIG);
// Streams group configurations
this.streamsGroupSessionTimeoutMs =
config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG);
this.streamsGroupMinSessionTimeoutMs =
config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG);
@@ -1188,6 +1209,20 @@ public class GroupCoordinatorConfig {
return shareGroupInitializeRetryIntervalMs;
}
+ /**
+ * Whether automatic creation of DLQ topics is enabled.
+ */
+ public boolean errorsDLQAutoCreateTopicsEnable() {
+ return errorsDLQAutoCreateTopicsEnable;
+ }
+
+ /**
+ * The required prefix for DLQ topic names.
+ */
+ public String errorsDLQTopicNamePrefix() {
+ return errorsDLQTopicNamePrefix;
+ }
+
/**
* The streams group session timeout in milliseconds.
*/
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
index 96df728aa2c..a79ec3ac1fc 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
@@ -61,6 +61,7 @@ import static
org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig.S
import static
org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig.SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_DEFAULT;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -126,7 +127,9 @@ public class GroupConfigTest {
assertPropertyInvalid(name, "not_a_boolean");
} else if
(GroupConfig.STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG.equals(name)) {
assertPropertyInvalid(name, "not_a_number", "1.0");
- } else {
+ } else if
(GroupConfig.ERRORS_DEADLETTERQUEUE_COPY_RECORD_ENABLE_CONFIG.equals(name)) {
+ assertPropertyInvalid(name, "not_a_boolean");
+ } else if
(!GroupConfig.ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG.equals(name)) {
assertPropertyInvalid(name, "not_a_number", "-0.1");
}
});
@@ -425,6 +428,10 @@ public class GroupConfigTest {
assertEquals(Optional.empty(), config.streamsAssignmentIntervalMs());
assertEquals(Optional.empty(), config.streamsAssignorOffloadEnable());
assertEquals(Optional.empty(), config.streamsTaskOffsetIntervalMs());
+
+ // DLQ configs - have defaults from CONFIG_DEF
+ assertEquals("", config.errorsDLQTopicName());
+ assertFalse(config.errorsDLQCopyRecordEnable());
}
@Test
@@ -452,6 +459,8 @@ public class GroupConfigTest {
props.put(GroupConfig.STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG, "1250");
props.put(GroupConfig.STREAMS_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, "false");
props.put(GroupConfig.STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG, "30000");
+ props.put(GroupConfig.ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG,
"my-dlq-topic");
+
props.put(GroupConfig.ERRORS_DEADLETTERQUEUE_COPY_RECORD_ENABLE_CONFIG, "true");
GroupConfig config = new GroupConfig(props);
@@ -481,6 +490,10 @@ public class GroupConfigTest {
assertEquals(Optional.of(1250), config.streamsAssignmentIntervalMs());
assertEquals(Optional.of(false),
config.streamsAssignorOffloadEnable());
assertEquals(Optional.of(30000), config.streamsTaskOffsetIntervalMs());
+
+ // DLQ configs
+ assertEquals("my-dlq-topic", config.errorsDLQTopicName());
+ assertTrue(config.errorsDLQCopyRecordEnable());
}
@Test
@@ -776,4 +789,49 @@ public class GroupConfigTest {
SHARE_GROUP_DELIVERY_COUNT_LIMIT,
SHARE_GROUP_MIN_DELIVERY_COUNT_LIMIT, SHARE_GROUP_MAX_DELIVERY_COUNT_LIMIT,
SHARE_GROUP_RECORD_LOCK_DURATION_MS,
SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS,
SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS);
}
+
+ @Test
+ public void testDLQConfigDefaults() {
+ // Test default DLQ configuration values (KIP-1191)
+ Map<String, String> configs = new HashMap<>();
+ GroupConfig config = new GroupConfig(configs);
+
+ assertEquals("", config.errorsDLQTopicName());
+ assertFalse(config.errorsDLQCopyRecordEnable());
+ }
+
+ @Test
+ public void testDLQConfigCustomValues() {
+ // Test custom DLQ configuration values
+ Map<String, String> configs = new HashMap<>();
+ configs.put(GroupConfig.ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG,
"my-dlq-topic");
+
configs.put(GroupConfig.ERRORS_DEADLETTERQUEUE_COPY_RECORD_ENABLE_CONFIG,
"true");
+
+ GroupConfig config = new GroupConfig(configs);
+
+ assertEquals("my-dlq-topic", config.errorsDLQTopicName());
+ assertTrue(config.errorsDLQCopyRecordEnable());
+ }
+
+ @Test
+ public void testDLQTopicNameCannotStartWithDoubleUnderscore() {
+ // DLQ topic name must not start with "__" (reserved for internal
topics)
+ Map<String, String> configs = new HashMap<>();
+ configs.put(GroupConfig.ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG,
"__my-dlq");
+
+ InvalidConfigurationException exception =
assertThrows(InvalidConfigurationException.class, () ->
+ GroupConfig.validate(configs, createGroupCoordinatorConfig(),
createShareGroupConfig()));
+ assertTrue(exception.getMessage().contains("DLQ topic name must not
start with '__'"));
+ }
+
+ @Test
+ public void testDLQBlankTopicNameIsValid() {
+ // Blank DLQ topic name is valid (means DLQ is disabled for that group)
+ Map<String, String> configs = new HashMap<>();
+ configs.put(GroupConfig.ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG, "");
+
+ GroupConfig config = new GroupConfig(configs);
+
+ assertEquals("", config.errorsDLQTopicName());
+ }
}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
index a1eff74238f..f9dc304e16d 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
@@ -724,6 +724,40 @@ public class GroupCoordinatorConfigTest {
assertEquals(20000, config.streamsGroupMinTaskOffsetIntervalMs());
}
+ @Test
+ public void testDLQAutoCreateTopicsEnableDefaultValue() {
+ Map<String, Object> configs = new HashMap<>();
+ GroupCoordinatorConfig config = createConfig(configs);
+ assertEquals(false, config.errorsDLQAutoCreateTopicsEnable());
+
assertEquals(GroupCoordinatorConfig.ERRORS_DEADLETTERQUEUE_AUTO_CREATE_TOPICS_ENABLE_DEFAULT,
+ config.errorsDLQAutoCreateTopicsEnable());
+ }
+
+ @Test
+ public void testDLQAutoCreateTopicsEnableCustomValue() {
+ Map<String, Object> configs = new HashMap<>();
+
configs.put(GroupCoordinatorConfig.ERRORS_DEADLETTERQUEUE_AUTO_CREATE_TOPICS_ENABLE_CONFIG,
true);
+ GroupCoordinatorConfig config = createConfig(configs);
+ assertEquals(true, config.errorsDLQAutoCreateTopicsEnable());
+ }
+
+ @Test
+ public void testDLQTopicNamePrefixDefaultValue() {
+ Map<String, Object> configs = new HashMap<>();
+ GroupCoordinatorConfig config = createConfig(configs);
+ assertEquals("dlq.", config.errorsDLQTopicNamePrefix());
+
assertEquals(GroupCoordinatorConfig.ERRORS_DEADLETTERQUEUE_TOPIC_NAME_PREFIX_DEFAULT,
+ config.errorsDLQTopicNamePrefix());
+ }
+
+ @Test
+ public void testDLQTopicNamePrefixCustomValue() {
+ Map<String, Object> configs = new HashMap<>();
+
configs.put(GroupCoordinatorConfig.ERRORS_DEADLETTERQUEUE_TOPIC_NAME_PREFIX_CONFIG,
"my-dlq-");
+ GroupCoordinatorConfig config = createConfig(configs);
+ assertEquals("my-dlq-", config.errorsDLQTopicNamePrefix());
+ }
+
public static GroupCoordinatorConfig createConfig(Map<String, Object>
configs) {
return new GroupCoordinatorConfig(new AbstractConfig(
GroupCoordinatorConfig.CONFIG_DEF,