This is an automated email from the ASF dual-hosted git repository.

AndrewJSchofield pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new cb7e3ab3757 KAFKA-20410: Register topic DLQ configs in LogConfig. 
[2/N] (#22167)
cb7e3ab3757 is described below

commit cb7e3ab375747bfafb97c746a2049443c2ddd9a1
Author: Sushant Mahajan <[email protected]>
AuthorDate: Wed Apr 29 20:54:40 2026 +0530

    KAFKA-20410: Register topic DLQ configs in LogConfig. [2/N] (#22167)
    
    * Register TopicConfig.ERRORS_DEADLETTERQUEUE_GROUP_ENABLE_CONFIG in
    LogConfig.
    * Update tests.
    
    Reviewers: Apoorv Mittal <[email protected]>, Andrew Schofield
     <[email protected]>
---
 core/src/test/scala/unit/kafka/log/LogConfigTest.scala            | 1 +
 .../java/org/apache/kafka/storage/internals/log/LogConfig.java    | 8 ++++++++
 2 files changed, 9 insertions(+)

diff --git a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala 
b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
index ecef38cfbe7..16b276ff668 100644
--- a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
@@ -73,6 +73,7 @@ class LogConfigTest {
       case TopicConfig.COMPRESSION_ZSTD_LEVEL_CONFIG => 
assertPropertyInvalid(name, "not_a_number", "-0.1")
       case TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG => 
assertPropertyInvalid(name, "not_a_number", "remove", "0")
       case TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG => 
assertPropertyInvalid(name, "not_a_number", "remove", "0")
+      case TopicConfig.ERRORS_DEADLETTERQUEUE_GROUP_ENABLE_CONFIG => 
assertPropertyInvalid(name, "not_a_boolean")
       case LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG => // no op
 
       case _ => assertPropertyInvalid(name, "not_a_number", "-1")
diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
index 1b1811ca5e7..7c5153825cc 100644
--- 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
@@ -248,6 +248,7 @@ public class LogConfig extends AbstractConfig {
                         TopicConfig.LOCAL_LOG_RETENTION_BYTES_DOC)
                 .define(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, BOOLEAN, 
false, MEDIUM, TopicConfig.REMOTE_LOG_COPY_DISABLE_DOC)
                 .define(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, 
BOOLEAN, false, MEDIUM, TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_DOC)
+                
.define(TopicConfig.ERRORS_DEADLETTERQUEUE_GROUP_ENABLE_CONFIG, BOOLEAN, false, 
MEDIUM, TopicConfig.ERRORS_DEADLETTERQUEUE_GROUP_ENABLE_DOC)
                 .defineInternal(INTERNAL_SEGMENT_BYTES_CONFIG, INT, null, 
null, MEDIUM, INTERNAL_SEGMENT_BYTES_DOC);
     }
 
@@ -279,6 +280,7 @@ public class LogConfig extends AbstractConfig {
     public final BrokerCompressionType compressionType;
     public final Optional<Compression> compression;
     public final boolean preallocate;
+    public final boolean errorsDeadletterqueueGroupEnable;
 
     public final TimestampType messageTimestampType;
 
@@ -335,6 +337,7 @@ public class LogConfig extends AbstractConfig {
         this.messageTimestampAfterMaxMs = 
getLong(TopicConfig.MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG);
         this.leaderReplicationThrottledReplicas = 
Collections.unmodifiableList(getList(QuotaConfig.LEADER_REPLICATION_THROTTLED_REPLICAS_CONFIG));
         this.followerReplicationThrottledReplicas = 
Collections.unmodifiableList(getList(QuotaConfig.FOLLOWER_REPLICATION_THROTTLED_REPLICAS_CONFIG));
+        this.errorsDeadletterqueueGroupEnable = 
getBoolean(TopicConfig.ERRORS_DEADLETTERQUEUE_GROUP_ENABLE_CONFIG);
 
         remoteLogConfig = new RemoteLogConfig(this);
     }
@@ -387,6 +390,10 @@ public class LogConfig extends AbstractConfig {
             return 0;
     }
 
+    public boolean errorsDeadletterqueueGroupEnable() {
+        return errorsDeadletterqueueGroupEnable;
+    }
+
     public boolean remoteStorageEnable() {
         return remoteLogConfig.remoteStorageEnable;
     }
@@ -652,6 +659,7 @@ public class LogConfig extends AbstractConfig {
                 ", followerReplicationThrottledReplicas=" + 
followerReplicationThrottledReplicas +
                 ", remoteLogConfig=" + remoteLogConfig +
                 ", maxMessageSize=" + maxMessageSize +
+                ", errorsDeadletterqueueGroupEnable=" + 
errorsDeadletterqueueGroupEnable +
                 '}';
     }
 

Reply via email to