This is an automated email from the ASF dual-hosted git repository.

apoorvmittal10 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8889765c5ba KAFKA-20549: Intd new cache helper interface for DLQ 
manager. [2/N] (#22241)
8889765c5ba is described below

commit 8889765c5baebe0e1bd181115b00be65f72dd220
Author: Sushant Mahajan <[email protected]>
AuthorDate: Tue May 12 14:23:18 2026 +0530

    KAFKA-20549: Intd new cache helper interface for DLQ manager. [2/N] (#22241)
    
    * Add new interface for metadata cache helper for
    `ShareGroupDLQStateManger`.
    * The impl of the new interface exposes logic to fetch new dynamic
    configs added as part of KIP-1191.
    * Additional plumbing in `ShareGroupDLQStateManger` also added.
    * Expose cluster level DLQ configs in `GroupConfigManager`.
    
    Reviewers: Apoorv Mittal <[email protected]>, Andrew Schofield
     <[email protected]>
---
 .../ShareCoordinatorMetadataCacheHelperImpl.java   |  47 +++-
 .../src/main/scala/kafka/server/BrokerServer.scala |   2 +-
 ...hareCoordinatorMetadataCacheHelperImplTest.java | 290 ++++++++++++++++++++-
 .../coordinator/group/GroupConfigManager.java      |   8 +
 .../coordinator/group/GroupConfigManagerTest.java  |  50 ++++
 .../dlq/ShareGroupDLQMetadataCacheHelper.java      |  83 ++++++
 .../share/dlq/ShareGroupDLQStateManager.java       | 134 +++++++++-
 7 files changed, 586 insertions(+), 28 deletions(-)

diff --git 
a/core/src/main/java/kafka/server/share/ShareCoordinatorMetadataCacheHelperImpl.java
 
b/core/src/main/java/kafka/server/share/ShareCoordinatorMetadataCacheHelperImpl.java
index ed9be995415..ad556514891 100644
--- 
a/core/src/main/java/kafka/server/share/ShareCoordinatorMetadataCacheHelperImpl.java
+++ 
b/core/src/main/java/kafka/server/share/ShareCoordinatorMetadataCacheHelperImpl.java
@@ -18,12 +18,16 @@
 package kafka.server.share;
 
 import org.apache.kafka.common.Node;
+import org.apache.kafka.common.config.TopicConfig;
 import org.apache.kafka.common.message.MetadataResponseData;
 import org.apache.kafka.common.network.ListenerName;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.coordinator.group.GroupConfig;
+import org.apache.kafka.coordinator.group.GroupConfigManager;
 import org.apache.kafka.metadata.MetadataCache;
 import org.apache.kafka.server.share.SharePartitionKey;
+import org.apache.kafka.server.share.dlq.ShareGroupDLQMetadataCacheHelper;
 import 
org.apache.kafka.server.share.persister.ShareCoordinatorMetadataCacheHelper;
 
 import org.slf4j.Logger;
@@ -33,23 +37,27 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.Properties;
 import java.util.Set;
 import java.util.function.Function;
 
-public class ShareCoordinatorMetadataCacheHelperImpl implements 
ShareCoordinatorMetadataCacheHelper {
+public class ShareCoordinatorMetadataCacheHelperImpl implements 
ShareCoordinatorMetadataCacheHelper, ShareGroupDLQMetadataCacheHelper {
     private final MetadataCache metadataCache;
     private final Function<SharePartitionKey, Integer> keyToPartitionMapper;
     private final ListenerName interBrokerListenerName;
+    private final GroupConfigManager groupConfigManager;
     private final Logger log = 
LoggerFactory.getLogger(ShareCoordinatorMetadataCacheHelperImpl.class);
 
     public ShareCoordinatorMetadataCacheHelperImpl(
         MetadataCache metadataCache,
         Function<SharePartitionKey, Integer> keyToPartitionMapper,
-        ListenerName interBrokerListenerName
+        ListenerName interBrokerListenerName,
+        GroupConfigManager groupConfigManager
     ) {
         this.metadataCache = Objects.requireNonNull(metadataCache, 
"metadataCache must not be null");
         this.keyToPartitionMapper = 
Objects.requireNonNull(keyToPartitionMapper, "keyToPartitionMapper must not be 
null");
         this.interBrokerListenerName = 
Objects.requireNonNull(interBrokerListenerName, "interBrokerListenerName must 
not be null");
+        this.groupConfigManager = Objects.requireNonNull(groupConfigManager, 
"groupConfigManager must not be null");
     }
 
     @Override
@@ -62,6 +70,41 @@ public class ShareCoordinatorMetadataCacheHelperImpl 
implements ShareCoordinator
         return false;
     }
 
+    @Override
+    public Optional<String> shareGroupDlqTopic(String groupId) {
+        Optional<GroupConfig> groupConfig = 
groupConfigManager.groupConfig(groupId);
+        return groupConfig.map(GroupConfig::errorsDLQTopicName);
+    }
+
+    @Override
+    public boolean isDlqAutoTopicCreateEnabled() {
+        return groupConfigManager.isDlqAutoTopicCreateEnabled();
+    }
+
+    @Override
+    public Optional<String> shareGroupDlqTopicPrefix() {
+        return groupConfigManager.shareGroupDlqTopicPrefix();
+    }
+
+    @Override
+    public boolean isDlqEnabledOnTopic(String topic) {
+        Properties props = metadataCache.topicConfig(topic);
+        if (props == null || props.isEmpty()) {
+            return false;
+        }
+        Object isEnabled = 
props.get(TopicConfig.ERRORS_DEADLETTERQUEUE_GROUP_ENABLE_CONFIG);
+        if (isEnabled instanceof Boolean) {
+            return (boolean) isEnabled;
+        }
+        return false;
+    }
+
+    @Override
+    public boolean isShareGroupDlqCopyRecordEnabled(String groupId) {
+        Optional<GroupConfig> groupConfig = 
groupConfigManager.groupConfig(groupId);
+        return 
groupConfig.map(GroupConfig::errorsDLQCopyRecordEnable).orElse(false);
+    }
+
     @Override
     public Node getShareCoordinator(SharePartitionKey key, String 
internalTopicName) {
         try {
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala 
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 1646c8c6a83..681848ad0ef 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -730,7 +730,7 @@ class BrokerServer(
           .newInstance(
             new PersisterStateManager(
               NetworkUtils.buildNetworkClient("Persister", config, metrics, 
Time.SYSTEM, new LogContext(s"[Persister broker=${config.brokerId}]")),
-              new ShareCoordinatorMetadataCacheHelperImpl(metadataCache, key 
=> shareCoordinator.partitionFor(key), config.interBrokerListenerName),
+              new ShareCoordinatorMetadataCacheHelperImpl(metadataCache, key 
=> shareCoordinator.partitionFor(key), config.interBrokerListenerName, 
groupConfigManager),
               Time.SYSTEM,
               shareGroupTimer
             )
diff --git 
a/core/src/test/java/kafka/server/share/ShareCoordinatorMetadataCacheHelperImplTest.java
 
b/core/src/test/java/kafka/server/share/ShareCoordinatorMetadataCacheHelperImplTest.java
index f445e65f985..69e8e235dd2 100644
--- 
a/core/src/test/java/kafka/server/share/ShareCoordinatorMetadataCacheHelperImplTest.java
+++ 
b/core/src/test/java/kafka/server/share/ShareCoordinatorMetadataCacheHelperImplTest.java
@@ -19,11 +19,14 @@ package kafka.server.share;
 
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.config.TopicConfig;
 import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
 import org.apache.kafka.common.internals.Topic;
 import org.apache.kafka.common.message.MetadataResponseData;
 import org.apache.kafka.common.network.ListenerName;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.coordinator.group.GroupConfig;
+import org.apache.kafka.coordinator.group.GroupConfigManager;
 import org.apache.kafka.metadata.MetadataCache;
 import org.apache.kafka.server.share.SharePartitionKey;
 import 
org.apache.kafka.server.share.persister.ShareCoordinatorMetadataCacheHelper;
@@ -32,6 +35,7 @@ import org.junit.jupiter.api.Test;
 
 import java.util.List;
 import java.util.Optional;
+import java.util.Properties;
 import java.util.function.Function;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -53,23 +57,34 @@ public class ShareCoordinatorMetadataCacheHelperImplTest {
         Exception e = assertThrows(NullPointerException.class, () -> new 
ShareCoordinatorMetadataCacheHelperImpl(
             null,
             func,
-            mock(ListenerName.class)
+            mock(ListenerName.class),
+            mock(GroupConfigManager.class)
         ));
         assertEquals("metadataCache must not be null", e.getMessage());
 
         e = assertThrows(NullPointerException.class, () -> new 
ShareCoordinatorMetadataCacheHelperImpl(
             mock(MetadataCache.class),
             null,
-            mock(ListenerName.class)
+            mock(ListenerName.class),
+            mock(GroupConfigManager.class)
         ));
         assertEquals("keyToPartitionMapper must not be null", e.getMessage());
 
         e = assertThrows(NullPointerException.class, () -> new 
ShareCoordinatorMetadataCacheHelperImpl(
             mock(MetadataCache.class),
             func,
-            null
+            null,
+            mock(GroupConfigManager.class)
         ));
         assertEquals("interBrokerListenerName must not be null", 
e.getMessage());
+
+        e = assertThrows(NullPointerException.class, () -> new 
ShareCoordinatorMetadataCacheHelperImpl(
+            mock(MetadataCache.class),
+            func,
+            mock(ListenerName.class),
+            null
+        ));
+        assertEquals("groupConfigManager must not be null", e.getMessage());
     }
 
     @Test
@@ -82,7 +97,8 @@ public class ShareCoordinatorMetadataCacheHelperImplTest {
         ShareCoordinatorMetadataCacheHelper cache = new 
ShareCoordinatorMetadataCacheHelperImpl(
             mockMetadataCache,
             func,
-            mock(ListenerName.class)
+            mock(ListenerName.class),
+            mock(GroupConfigManager.class)
         );
 
         
when(mockMetadataCache.contains(eq(Topic.SHARE_GROUP_STATE_TOPIC_NAME)))
@@ -104,7 +120,8 @@ public class ShareCoordinatorMetadataCacheHelperImplTest {
         ShareCoordinatorMetadataCacheHelper cache = new 
ShareCoordinatorMetadataCacheHelperImpl(
             mockMetadataCache,
             func,
-            mock(ListenerName.class)
+            mock(ListenerName.class),
+            mock(GroupConfigManager.class)
         );
 
         
when(mockMetadataCache.contains(eq(Topic.SHARE_GROUP_STATE_TOPIC_NAME)))
@@ -126,7 +143,8 @@ public class ShareCoordinatorMetadataCacheHelperImplTest {
         ShareCoordinatorMetadataCacheHelper cache = new 
ShareCoordinatorMetadataCacheHelperImpl(
             mockMetadataCache,
             func,
-            mock(ListenerName.class)
+            mock(ListenerName.class),
+            mock(GroupConfigManager.class)
         );
 
         assertEquals(
@@ -146,7 +164,8 @@ public class ShareCoordinatorMetadataCacheHelperImplTest {
         ShareCoordinatorMetadataCacheHelper cache = new 
ShareCoordinatorMetadataCacheHelperImpl(
             mockMetadataCache,
             func,
-            mockListenerName
+            mockListenerName,
+            mock(GroupConfigManager.class)
         );
 
         
when(mockMetadataCache.contains(eq(Topic.SHARE_GROUP_STATE_TOPIC_NAME)))
@@ -234,7 +253,8 @@ public class ShareCoordinatorMetadataCacheHelperImplTest {
         ShareCoordinatorMetadataCacheHelper cache = new 
ShareCoordinatorMetadataCacheHelperImpl(
             mockMetadataCache,
             func,
-            mockListenerName
+            mockListenerName,
+            mock(GroupConfigManager.class)
         );
 
         
when(mockMetadataCache.contains(eq(Topic.SHARE_GROUP_STATE_TOPIC_NAME)))
@@ -290,7 +310,8 @@ public class ShareCoordinatorMetadataCacheHelperImplTest {
         ShareCoordinatorMetadataCacheHelper cache = new 
ShareCoordinatorMetadataCacheHelperImpl(
             mockMetadataCache,
             func,
-            mockListenerName
+            mockListenerName,
+            mock(GroupConfigManager.class)
         );
 
         
when(mockMetadataCache.contains(eq(Topic.SHARE_GROUP_STATE_TOPIC_NAME)))
@@ -346,7 +367,8 @@ public class ShareCoordinatorMetadataCacheHelperImplTest {
         ShareCoordinatorMetadataCacheHelper cache = new 
ShareCoordinatorMetadataCacheHelperImpl(
             mockMetadataCache,
             func,
-            mockListenerName
+            mockListenerName,
+            mock(GroupConfigManager.class)
         );
 
         
when(mockMetadataCache.contains(eq(Topic.SHARE_GROUP_STATE_TOPIC_NAME)))
@@ -403,7 +425,8 @@ public class ShareCoordinatorMetadataCacheHelperImplTest {
         ShareCoordinatorMetadataCacheHelper cache = new 
ShareCoordinatorMetadataCacheHelperImpl(
             mockMetadataCache,
             func,
-            mockListenerName
+            mockListenerName,
+            mock(GroupConfigManager.class)
         );
 
         when(mockMetadataCache.getAliveBrokerNodes(
@@ -429,7 +452,8 @@ public class ShareCoordinatorMetadataCacheHelperImplTest {
         ShareCoordinatorMetadataCacheHelper cache = new 
ShareCoordinatorMetadataCacheHelperImpl(
             mockMetadataCache,
             func,
-            mockListenerName
+            mockListenerName,
+            mock(GroupConfigManager.class)
         );
 
         List<Node> nodes = List.of(
@@ -450,4 +474,246 @@ public class ShareCoordinatorMetadataCacheHelperImplTest {
 
         verify(mockMetadataCache, 
times(1)).getAliveBrokerNodes(eq(mockListenerName));
     }
+
+    // Tests for shareGroupDlqTopic
+
+    @Test
+    public void testShareGroupDlqTopicReturnsEmptyWhenGroupConfigIsEmpty() {
+        GroupConfigManager mockGroupConfigManager = 
mock(GroupConfigManager.class);
+        
when(mockGroupConfigManager.groupConfig("test-group")).thenReturn(Optional.empty());
+
+        ShareCoordinatorMetadataCacheHelperImpl cache = new 
ShareCoordinatorMetadataCacheHelperImpl(
+            mock(MetadataCache.class),
+            sharePartitionKey -> 0,
+            mock(ListenerName.class),
+            mockGroupConfigManager
+        );
+
+        assertEquals(Optional.empty(), cache.shareGroupDlqTopic("test-group"));
+        verify(mockGroupConfigManager, times(1)).groupConfig("test-group");
+    }
+
+    @Test
+    public void testShareGroupDlqTopicReturnsTopicName() {
+        GroupConfigManager mockGroupConfigManager = 
mock(GroupConfigManager.class);
+        GroupConfig mockGroupConfig = mock(GroupConfig.class);
+        when(mockGroupConfig.errorsDLQTopicName()).thenReturn("dlq-topic");
+        
when(mockGroupConfigManager.groupConfig("test-group")).thenReturn(Optional.of(mockGroupConfig));
+
+        ShareCoordinatorMetadataCacheHelperImpl cache = new 
ShareCoordinatorMetadataCacheHelperImpl(
+            mock(MetadataCache.class),
+            sharePartitionKey -> 0,
+            mock(ListenerName.class),
+            mockGroupConfigManager
+        );
+
+        assertEquals(Optional.of("dlq-topic"), 
cache.shareGroupDlqTopic("test-group"));
+        verify(mockGroupConfigManager, times(1)).groupConfig("test-group");
+    }
+
+    // Tests for isShareGroupDlqCopyRecordEnabled
+
+    @Test
+    public void 
testIsShareGroupDlqCopyRecordEnabledReturnsFalseWhenGroupConfigEmpty() {
+        GroupConfigManager mockGroupConfigManager = 
mock(GroupConfigManager.class);
+        
when(mockGroupConfigManager.groupConfig("test-group")).thenReturn(Optional.empty());
+
+        ShareCoordinatorMetadataCacheHelperImpl cache = new 
ShareCoordinatorMetadataCacheHelperImpl(
+            mock(MetadataCache.class),
+            sharePartitionKey -> 0,
+            mock(ListenerName.class),
+            mockGroupConfigManager
+        );
+
+        assertFalse(cache.isShareGroupDlqCopyRecordEnabled("test-group"));
+        verify(mockGroupConfigManager, times(1)).groupConfig("test-group");
+    }
+
+    @Test
+    public void testIsShareGroupDlqCopyRecordEnabledReturnsTrue() {
+        GroupConfigManager mockGroupConfigManager = 
mock(GroupConfigManager.class);
+        GroupConfig mockGroupConfig = mock(GroupConfig.class);
+        when(mockGroupConfig.errorsDLQCopyRecordEnable()).thenReturn(true);
+        
when(mockGroupConfigManager.groupConfig("test-group")).thenReturn(Optional.of(mockGroupConfig));
+
+        ShareCoordinatorMetadataCacheHelperImpl cache = new 
ShareCoordinatorMetadataCacheHelperImpl(
+            mock(MetadataCache.class),
+            sharePartitionKey -> 0,
+            mock(ListenerName.class),
+            mockGroupConfigManager
+        );
+
+        assertTrue(cache.isShareGroupDlqCopyRecordEnabled("test-group"));
+        verify(mockGroupConfigManager, times(1)).groupConfig("test-group");
+    }
+
+    @Test
+    public void testIsShareGroupDlqCopyRecordEnabledReturnsFalse() {
+        GroupConfigManager mockGroupConfigManager = 
mock(GroupConfigManager.class);
+        GroupConfig mockGroupConfig = mock(GroupConfig.class);
+        when(mockGroupConfig.errorsDLQCopyRecordEnable()).thenReturn(false);
+        
when(mockGroupConfigManager.groupConfig("test-group")).thenReturn(Optional.of(mockGroupConfig));
+
+        ShareCoordinatorMetadataCacheHelperImpl cache = new 
ShareCoordinatorMetadataCacheHelperImpl(
+            mock(MetadataCache.class),
+            sharePartitionKey -> 0,
+            mock(ListenerName.class),
+            mockGroupConfigManager
+        );
+
+        assertFalse(cache.isShareGroupDlqCopyRecordEnabled("test-group"));
+        verify(mockGroupConfigManager, times(1)).groupConfig("test-group");
+    }
+
+    // Tests for isDlqAutoTopicCreateEnabled
+
+    @Test
+    public void testIsDlqAutoTopicCreateEnabledReturnsTrue() {
+        GroupConfigManager mockGroupConfigManager = 
mock(GroupConfigManager.class);
+        
when(mockGroupConfigManager.isDlqAutoTopicCreateEnabled()).thenReturn(true);
+
+        ShareCoordinatorMetadataCacheHelperImpl cache = new 
ShareCoordinatorMetadataCacheHelperImpl(
+            mock(MetadataCache.class),
+            sharePartitionKey -> 0,
+            mock(ListenerName.class),
+            mockGroupConfigManager
+        );
+
+        assertTrue(cache.isDlqAutoTopicCreateEnabled());
+        verify(mockGroupConfigManager, times(1)).isDlqAutoTopicCreateEnabled();
+    }
+
+    @Test
+    public void testIsDlqAutoTopicCreateEnabledReturnsFalse() {
+        GroupConfigManager mockGroupConfigManager = 
mock(GroupConfigManager.class);
+        
when(mockGroupConfigManager.isDlqAutoTopicCreateEnabled()).thenReturn(false);
+
+        ShareCoordinatorMetadataCacheHelperImpl cache = new 
ShareCoordinatorMetadataCacheHelperImpl(
+            mock(MetadataCache.class),
+            sharePartitionKey -> 0,
+            mock(ListenerName.class),
+            mockGroupConfigManager
+        );
+
+        assertFalse(cache.isDlqAutoTopicCreateEnabled());
+        verify(mockGroupConfigManager, times(1)).isDlqAutoTopicCreateEnabled();
+    }
+
+    // Tests for shareGroupDlqTopicPrefix
+
+    @Test
+    public void testShareGroupDlqTopicPrefixReturnsPrefix() {
+        GroupConfigManager mockGroupConfigManager = 
mock(GroupConfigManager.class);
+        
when(mockGroupConfigManager.shareGroupDlqTopicPrefix()).thenReturn(Optional.of("dlq."));
+
+        ShareCoordinatorMetadataCacheHelperImpl cache = new 
ShareCoordinatorMetadataCacheHelperImpl(
+            mock(MetadataCache.class),
+            sharePartitionKey -> 0,
+            mock(ListenerName.class),
+            mockGroupConfigManager
+        );
+
+        assertEquals(Optional.of("dlq."), cache.shareGroupDlqTopicPrefix());
+        verify(mockGroupConfigManager, times(1)).shareGroupDlqTopicPrefix();
+    }
+
+    @Test
+    public void testShareGroupDlqTopicPrefixReturnsEmpty() {
+        GroupConfigManager mockGroupConfigManager = 
mock(GroupConfigManager.class);
+        
when(mockGroupConfigManager.shareGroupDlqTopicPrefix()).thenReturn(Optional.empty());
+
+        ShareCoordinatorMetadataCacheHelperImpl cache = new 
ShareCoordinatorMetadataCacheHelperImpl(
+            mock(MetadataCache.class),
+            sharePartitionKey -> 0,
+            mock(ListenerName.class),
+            mockGroupConfigManager
+        );
+
+        assertEquals(Optional.empty(), cache.shareGroupDlqTopicPrefix());
+        verify(mockGroupConfigManager, times(1)).shareGroupDlqTopicPrefix();
+    }
+
+    // Tests for isDlqEnabledOnTopic
+
+    @Test
+    public void testIsDlqEnabledOnTopicReturnsFalseWhenTopicConfigNull() {
+        MetadataCache mockMetadataCache = mock(MetadataCache.class);
+        when(mockMetadataCache.topicConfig("test-topic")).thenReturn(null);
+
+        ShareCoordinatorMetadataCacheHelperImpl cache = new 
ShareCoordinatorMetadataCacheHelperImpl(
+            mockMetadataCache,
+            sharePartitionKey -> 0,
+            mock(ListenerName.class),
+            mock(GroupConfigManager.class)
+        );
+
+        assertFalse(cache.isDlqEnabledOnTopic("test-topic"));
+        verify(mockMetadataCache, times(1)).topicConfig("test-topic");
+    }
+
+    @Test
+    public void testIsDlqEnabledOnTopicReturnsFalseWhenTopicConfigEmpty() {
+        MetadataCache mockMetadataCache = mock(MetadataCache.class);
+        when(mockMetadataCache.topicConfig("test-topic")).thenReturn(new 
Properties());
+
+        ShareCoordinatorMetadataCacheHelperImpl cache = new 
ShareCoordinatorMetadataCacheHelperImpl(
+            mockMetadataCache,
+            sharePartitionKey -> 0,
+            mock(ListenerName.class),
+            mock(GroupConfigManager.class)
+        );
+
+        assertFalse(cache.isDlqEnabledOnTopic("test-topic"));
+    }
+
+    @Test
+    public void testIsDlqEnabledOnTopicReturnsFalseWhenConfigValueNotBoolean() 
{
+        MetadataCache mockMetadataCache = mock(MetadataCache.class);
+        Properties props = new Properties();
+        props.put(TopicConfig.ERRORS_DEADLETTERQUEUE_GROUP_ENABLE_CONFIG, 
"true");
+        when(mockMetadataCache.topicConfig("test-topic")).thenReturn(props);
+
+        ShareCoordinatorMetadataCacheHelperImpl cache = new 
ShareCoordinatorMetadataCacheHelperImpl(
+            mockMetadataCache,
+            sharePartitionKey -> 0,
+            mock(ListenerName.class),
+            mock(GroupConfigManager.class)
+        );
+
+        assertFalse(cache.isDlqEnabledOnTopic("test-topic"));
+    }
+
+    @Test
+    public void testIsDlqEnabledOnTopicReturnsTrue() {
+        MetadataCache mockMetadataCache = mock(MetadataCache.class);
+        Properties props = new Properties();
+        props.put(TopicConfig.ERRORS_DEADLETTERQUEUE_GROUP_ENABLE_CONFIG, 
true);
+        when(mockMetadataCache.topicConfig("test-topic")).thenReturn(props);
+
+        ShareCoordinatorMetadataCacheHelperImpl cache = new 
ShareCoordinatorMetadataCacheHelperImpl(
+            mockMetadataCache,
+            sharePartitionKey -> 0,
+            mock(ListenerName.class),
+            mock(GroupConfigManager.class)
+        );
+
+        assertTrue(cache.isDlqEnabledOnTopic("test-topic"));
+    }
+
+    @Test
+    public void testIsDlqEnabledOnTopicReturnsFalseValue() {
+        MetadataCache mockMetadataCache = mock(MetadataCache.class);
+        Properties props = new Properties();
+        props.put(TopicConfig.ERRORS_DEADLETTERQUEUE_GROUP_ENABLE_CONFIG, 
false);
+        when(mockMetadataCache.topicConfig("test-topic")).thenReturn(props);
+
+        ShareCoordinatorMetadataCacheHelperImpl cache = new 
ShareCoordinatorMetadataCacheHelperImpl(
+            mockMetadataCache,
+            sharePartitionKey -> 0,
+            mock(ListenerName.class),
+            mock(GroupConfigManager.class)
+        );
+
+        assertFalse(cache.isDlqEnabledOnTopic("test-topic"));
+    }
 }
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfigManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfigManager.java
index 310d079205c..9335b9ea91e 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfigManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfigManager.java
@@ -90,6 +90,14 @@ public class GroupConfigManager implements AutoCloseable {
         return List.copyOf(configMap.keySet());
     }
 
+    public Optional<String> shareGroupDlqTopicPrefix() {
+        return 
Optional.ofNullable(groupCoordinatorConfig.errorsDLQTopicNamePrefix());
+    }
+
+    public boolean isDlqAutoTopicCreateEnabled() {
+        return groupCoordinatorConfig.errorsDLQAutoCreateTopicsEnable();
+    }
+
     /**
      * Remove all group configs.
      */
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigManagerTest.java
index 062b7f861b8..9e842de6c7a 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigManagerTest.java
@@ -130,6 +130,56 @@ public class GroupConfigManagerTest {
         assertTrue(configManager.groupIds().contains(groupId2));
     }
 
+    // Tests for isDlqAutoTopicCreateEnabled
+
+    @Test
+    public void testIsDlqAutoTopicCreateEnabledDefault() {
+        assertFalse(configManager.isDlqAutoTopicCreateEnabled());
+    }
+
+    @Test
+    public void testIsDlqAutoTopicCreateEnabledTrue() {
+        Map<String, Object> overrides = new HashMap<>();
+        
overrides.put(GroupCoordinatorConfig.ERRORS_DEADLETTERQUEUE_AUTO_CREATE_TOPICS_ENABLE_CONFIG,
 true);
+        configManager = createConfigManager(overrides);
+
+        assertTrue(configManager.isDlqAutoTopicCreateEnabled());
+    }
+
+    @Test
+    public void testIsDlqAutoTopicCreateEnabledFalse() {
+        Map<String, Object> overrides = new HashMap<>();
+        
overrides.put(GroupCoordinatorConfig.ERRORS_DEADLETTERQUEUE_AUTO_CREATE_TOPICS_ENABLE_CONFIG,
 false);
+        configManager = createConfigManager(overrides);
+
+        assertFalse(configManager.isDlqAutoTopicCreateEnabled());
+    }
+
+    // Tests for shareGroupDlqTopicPrefix
+
+    @Test
+    public void testShareGroupDlqTopicPrefixDefault() {
+        assertEquals(Optional.of("dlq."), 
configManager.shareGroupDlqTopicPrefix());
+    }
+
+    @Test
+    public void testShareGroupDlqTopicPrefixCustom() {
+        Map<String, Object> overrides = new HashMap<>();
+        
overrides.put(GroupCoordinatorConfig.ERRORS_DEADLETTERQUEUE_TOPIC_NAME_PREFIX_CONFIG,
 "custom-dlq-");
+        configManager = createConfigManager(overrides);
+
+        assertEquals(Optional.of("custom-dlq-"), 
configManager.shareGroupDlqTopicPrefix());
+    }
+
+    @Test
+    public void testShareGroupDlqTopicPrefixEmpty() {
+        Map<String, Object> overrides = new HashMap<>();
+        
overrides.put(GroupCoordinatorConfig.ERRORS_DEADLETTERQUEUE_TOPIC_NAME_PREFIX_CONFIG,
 "");
+        configManager = createConfigManager(overrides);
+
+        assertEquals(Optional.of(""), 
configManager.shareGroupDlqTopicPrefix());
+    }
+
     public static GroupConfigManager createConfigManager() {
         return createConfigManager(new HashMap<>());
     }
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQMetadataCacheHelper.java
 
b/server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQMetadataCacheHelper.java
new file mode 100644
index 00000000000..e670676831f
--- /dev/null
+++ 
b/server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQMetadataCacheHelper.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.share.dlq;
+
+import org.apache.kafka.common.Node;
+
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * Interface encapsulating metadata cache related methods which are
+ * required by share group DLQ operations. This will be helpful in testing
+ * and keeping implementations manageable.
+ */
+public interface ShareGroupDLQMetadataCacheHelper {
+    /**
+     * Return optional of string representing of DLQ topic.
+     *
+     * @param groupId Id of the share group
+     * @return  Optional of string representing of DLQ topic if set, empty 
otherwise
+     */
+    Optional<String> shareGroupDlqTopic(String groupId);
+
+    /**
+     * Check if DLQ dynamic config is set on the topic to mark it available 
for DLQ writes.
+     *
+     * @param topic The name of the topic
+     * @return  Boolean which is true when DLQ is set on the topic, false 
otherwise
+     */
+    boolean isDlqEnabledOnTopic(String topic);
+
+    /**
+     * Check if the cluster config to auto create DLQ topics is enabled.
+     *
+     * @return Boolean which is true when DLQ topic auto create cluster config 
is set, false otherwise
+     */
+    boolean isDlqAutoTopicCreateEnabled();
+
+    /**
+     * Return optional of string representing the configured DLQ prefix.
+     *
+     * @return  Optional of string representing DLQ prefix if configured, 
empty otherwise
+     */
+    Optional<String> shareGroupDlqTopicPrefix();
+
+    /**
+     * Check is copy record data into DLQ topic is enabled.
+     *
+     * @param groupId The id of the share group
+     * @return  Boolean which is true if config is set, false otherwise
+     */
+    boolean isShareGroupDlqCopyRecordEnabled(String groupId);
+
+    /**
+     * Check if a topic is present in the metadata cache.
+     *
+     * @param topic The name of the topic
+     * @return  Boolean which is true is topic exists, false otherwise
+     */
+    boolean containsTopic(String topic);
+
+    /**
+     * Get all nodes in the kafka cluster encapsulated in the {@link Node} 
object.
+     *
+     * @return  List of nodes representing the cluster nodes
+     */
+    List<Node> getClusterNodes();
+}
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManager.java
 
b/server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManager.java
index 771c18720a5..b55996b55a0 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManager.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManager.java
@@ -21,16 +21,24 @@ import org.apache.kafka.clients.ClientResponse;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.KafkaClient;
 import org.apache.kafka.clients.RequestCompletionHandler;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.message.CreateTopicsRequestData;
+import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.AbstractRequest;
 import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.CreateTopicsRequest;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Timer;
-import 
org.apache.kafka.server.share.persister.ShareCoordinatorMetadataCacheHelper;
 import org.apache.kafka.server.util.InterBrokerSendThread;
 import org.apache.kafka.server.util.RequestAndCompletionHandler;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.Collection;
 import java.util.List;
+import java.util.Optional;
 import java.util.Random;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -47,14 +55,9 @@ public class ShareGroupDLQStateManager {
     private final SendThread sender;
     private final Time time;
     private final Timer timer;
-    private final ShareCoordinatorMetadataCacheHelper cacheHelper;
-
-    public enum RPCType {
-        PRODUCE,
-        CREATE_TOPIC
-    }
+    private final ShareGroupDLQMetadataCacheHelper cacheHelper;
 
-    public ShareGroupDLQStateManager(KafkaClient client, 
ShareCoordinatorMetadataCacheHelper cacheHelper, Time time, Timer timer) {
+    public ShareGroupDLQStateManager(KafkaClient client, 
ShareGroupDLQMetadataCacheHelper cacheHelper, Time time, Timer timer) {
         if (client == null) {
             throw new IllegalArgumentException("Kafkaclient must not be 
null.");
         }
@@ -105,21 +108,80 @@ public class ShareGroupDLQStateManager {
      * @return A future completing normally on successful DLQ, exceptionally 
otherwise.
      */
     public CompletableFuture<Void> dlq(ShareGroupDLQRecordParameter param) {
-        ProduceRequestHandler requestHandler = new 
ProduceRequestHandler(param);
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        ProduceRequestHandler requestHandler = new 
ProduceRequestHandler(param, future);
         sender.enqueue(requestHandler);
-        return requestHandler.result().thenAccept(response -> {
-        });
+        return future;
     }
 
     private abstract class ShareGroupDLQStateManagerHandler implements 
RequestCompletionHandler {
+        private final ShareGroupDLQRecordParameter param;
+
+        ShareGroupDLQStateManagerHandler(ShareGroupDLQRecordParameter param) {
+            this.param = param;
+        }
+
         protected abstract AbstractRequest.Builder<? extends AbstractRequest> 
requestBuilder();
 
         protected abstract CompletableFuture<? extends AbstractResponse> 
result();
+
+        protected abstract String name();
+
+        protected abstract void createTopicErrorResponse(Exception exception);
+
+        protected AbstractRequest.Builder<CreateTopicsRequest> 
createTopicBuilder() {
+            return new CreateTopicsRequest.Builder(new 
CreateTopicsRequestData());
+        }
+
+        public Optional<Throwable> validateDlqTopic() {
+            Optional<String> topicNameOpt = 
cacheHelper.shareGroupDlqTopic(param.groupId());
+            Optional<String> topicPrefix = 
cacheHelper.shareGroupDlqTopicPrefix();
+
+            // Verify that DLQ topic for the share group is set and is 
correctly named.
+            if (topicNameOpt.isEmpty()) {
+                return Optional.of(new 
ConfigException(String.format("Configured DLQ topic name in share group: %s is 
empty.", param.groupId())));
+            } else if (!topicNameOpt.get().startsWith("__")) {
+                return Optional.of(new 
ConfigException(String.format("Configured DLQ topic name in share group: %s 
cannot start with __, topic: %s.", param.groupId(), topicNameOpt.get())));
+            }
+
+            String topicName = topicNameOpt.get();
+
+            // Verify that DLQ is enabled on a correctly named topic, 
configured on a share group.
+            if (cacheHelper.containsTopic(topicName) && 
!cacheHelper.isDlqEnabledOnTopic(topicName)) {
+                return Optional.of(new ConfigException(String.format("DLQ is 
not enabled on configured DLQ topic for share group: %s, topic: %s.", 
param.groupId(), topicName)));
+            }
+
+            // Verify that for a non-existent correctly named DLQ topic, auto 
create should be enabled.
+            if (!cacheHelper.containsTopic(topicName) && 
!cacheHelper.isDlqAutoTopicCreateEnabled()) {
+                return Optional.of(new ConfigException(String.format("DLQ 
topic does not exist and auto create is disabled on cluster for share group: 
%s, topic: %s.", param.groupId(), topicName)));
+            }
+
+            // Verify that if configured, the DLQ topic name prefix aligns 
with the topic name.
+            return topicPrefix.map(prefix -> {
+                if (!prefix.isEmpty() && !topicName.startsWith(prefix)) {
+                    return new ConfigException(String.format("Configured DLQ 
topic name does not comply with the DLQ topic prefix in share group: %s, topic: 
%s, prefix: %s.", param.groupId(), topicName, prefix));
+                }
+                return null;
+            });
+        }
+
+        public ShareGroupDLQRecordParameter recordParam() {
+            return param;
+        }
+
+        public boolean dlqTopicExists() {
+            Optional<String> shareGroupDlqTopic = 
cacheHelper.shareGroupDlqTopic(param.groupId());
+            return 
shareGroupDlqTopic.filter(cacheHelper::containsTopic).isPresent();
+        }
     }
 
     private class ProduceRequestHandler extends 
ShareGroupDLQStateManagerHandler {
+        private final CompletableFuture<Void> result;
+        private static final Logger LOG = 
LoggerFactory.getLogger(ShareGroupDLQStateManager.ProduceRequestHandler.class);
 
-        ProduceRequestHandler(ShareGroupDLQRecordParameter param) {
+        public ProduceRequestHandler(ShareGroupDLQRecordParameter param, 
CompletableFuture<Void> result) {
+            super(param);
+            this.result = result;
         }
 
         @Override
@@ -132,13 +194,23 @@ public class ShareGroupDLQStateManager {
             return CompletableFuture.completedFuture(null);
         }
 
+        @Override
+        protected String name() {
+            return "ProduceRequestHandler";
+        }
+
+        @Override
+        protected void createTopicErrorResponse(Exception exception) {
+            this.result.completeExceptionally(exception);
+        }
+
         @Override
         public void onComplete(ClientResponse response) {
 
         }
     }
 
-    private static class SendThread extends InterBrokerSendThread {
+    private class SendThread extends InterBrokerSendThread {
         private final 
ConcurrentLinkedQueue<ShareGroupDLQStateManager.ShareGroupDLQStateManagerHandler>
 queue = new ConcurrentLinkedQueue<>();
         private final Random random;
 
@@ -149,12 +221,48 @@ public class ShareGroupDLQStateManager {
 
         @Override
         public Collection<RequestAndCompletionHandler> generateRequests() {
+            if (!queue.isEmpty()) {
+                ShareGroupDLQStateManager.ShareGroupDLQStateManagerHandler 
handler = queue.poll();
+                // At this point either a correctly named and configured DLQ 
topic exists or
+                // one is configured but does non-exist. We have already 
validated that the
+                // auto create should be enabled, in that case.
+                if (!handler.dlqTopicExists()) {
+                    // We need to send RPC to create the topic
+                    Node randomNode = randomNode();
+                    if (randomNode == Node.noNode()) {
+                        log.error("Unable to find node to use for coordinator 
lookup.");
+                        // fatal failure, cannot retry or progress
+                        // fail the RPC
+                        
handler.createTopicErrorResponse(Errors.BROKER_NOT_AVAILABLE.exception());
+                        return List.of();
+                    }
+                    return List.of(new RequestAndCompletionHandler(
+                        time.milliseconds(),
+                        randomNode,
+                        handler.createTopicBuilder(),
+                        handler
+                    ));
+                }
+            }
             return List.of();
         }
 
         public void 
enqueue(ShareGroupDLQStateManager.ShareGroupDLQStateManagerHandler handler) {
+            Optional<Throwable> exp = handler.validateDlqTopic();
+            if (exp.isPresent()) {
+                handler.result().completeExceptionally(exp.get());
+                return;
+            }
             queue.add(handler);
             wakeup();
         }
+
+        private Node randomNode() {
+            List<Node> nodes = cacheHelper.getClusterNodes();
+            if (nodes == null || nodes.isEmpty()) {
+                return Node.noNode();
+            }
+            return nodes.get(random.nextInt(nodes.size()));
+        }
     }
 }


Reply via email to