This is an automated email from the ASF dual-hosted git repository.
apoorvmittal10 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 8889765c5ba KAFKA-20549: Intd new cache helper interface for DLQ
manager. [2/N] (#22241)
8889765c5ba is described below
commit 8889765c5baebe0e1bd181115b00be65f72dd220
Author: Sushant Mahajan <[email protected]>
AuthorDate: Tue May 12 14:23:18 2026 +0530
KAFKA-20549: Intd new cache helper interface for DLQ manager. [2/N] (#22241)
* Add new interface for metadata cache helper for
`ShareGroupDLQStateManger`.
* The impl of the new interface exposes logic to fetch new dynamic
configs added as part of KIP-1191.
* Additional plumbing in `ShareGroupDLQStateManger` also added.
* Expose cluster level DLQ configs in `GroupConfigManager`.
Reviewers: Apoorv Mittal <[email protected]>, Andrew Schofield
<[email protected]>
---
.../ShareCoordinatorMetadataCacheHelperImpl.java | 47 +++-
.../src/main/scala/kafka/server/BrokerServer.scala | 2 +-
...hareCoordinatorMetadataCacheHelperImplTest.java | 290 ++++++++++++++++++++-
.../coordinator/group/GroupConfigManager.java | 8 +
.../coordinator/group/GroupConfigManagerTest.java | 50 ++++
.../dlq/ShareGroupDLQMetadataCacheHelper.java | 83 ++++++
.../share/dlq/ShareGroupDLQStateManager.java | 134 +++++++++-
7 files changed, 586 insertions(+), 28 deletions(-)
diff --git
a/core/src/main/java/kafka/server/share/ShareCoordinatorMetadataCacheHelperImpl.java
b/core/src/main/java/kafka/server/share/ShareCoordinatorMetadataCacheHelperImpl.java
index ed9be995415..ad556514891 100644
---
a/core/src/main/java/kafka/server/share/ShareCoordinatorMetadataCacheHelperImpl.java
+++
b/core/src/main/java/kafka/server/share/ShareCoordinatorMetadataCacheHelperImpl.java
@@ -18,12 +18,16 @@
package kafka.server.share;
import org.apache.kafka.common.Node;
+import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.message.MetadataResponseData;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.coordinator.group.GroupConfig;
+import org.apache.kafka.coordinator.group.GroupConfigManager;
import org.apache.kafka.metadata.MetadataCache;
import org.apache.kafka.server.share.SharePartitionKey;
+import org.apache.kafka.server.share.dlq.ShareGroupDLQMetadataCacheHelper;
import
org.apache.kafka.server.share.persister.ShareCoordinatorMetadataCacheHelper;
import org.slf4j.Logger;
@@ -33,23 +37,27 @@ import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
+import java.util.Properties;
import java.util.Set;
import java.util.function.Function;
-public class ShareCoordinatorMetadataCacheHelperImpl implements
ShareCoordinatorMetadataCacheHelper {
+public class ShareCoordinatorMetadataCacheHelperImpl implements
ShareCoordinatorMetadataCacheHelper, ShareGroupDLQMetadataCacheHelper {
private final MetadataCache metadataCache;
private final Function<SharePartitionKey, Integer> keyToPartitionMapper;
private final ListenerName interBrokerListenerName;
+ private final GroupConfigManager groupConfigManager;
private final Logger log =
LoggerFactory.getLogger(ShareCoordinatorMetadataCacheHelperImpl.class);
public ShareCoordinatorMetadataCacheHelperImpl(
MetadataCache metadataCache,
Function<SharePartitionKey, Integer> keyToPartitionMapper,
- ListenerName interBrokerListenerName
+ ListenerName interBrokerListenerName,
+ GroupConfigManager groupConfigManager
) {
this.metadataCache = Objects.requireNonNull(metadataCache,
"metadataCache must not be null");
this.keyToPartitionMapper =
Objects.requireNonNull(keyToPartitionMapper, "keyToPartitionMapper must not be
null");
this.interBrokerListenerName =
Objects.requireNonNull(interBrokerListenerName, "interBrokerListenerName must
not be null");
+ this.groupConfigManager = Objects.requireNonNull(groupConfigManager,
"groupConfigManager must not be null");
}
@Override
@@ -62,6 +70,41 @@ public class ShareCoordinatorMetadataCacheHelperImpl
implements ShareCoordinator
return false;
}
+ @Override
+ public Optional<String> shareGroupDlqTopic(String groupId) {
+ Optional<GroupConfig> groupConfig =
groupConfigManager.groupConfig(groupId);
+ return groupConfig.map(GroupConfig::errorsDLQTopicName);
+ }
+
+ @Override
+ public boolean isDlqAutoTopicCreateEnabled() {
+ return groupConfigManager.isDlqAutoTopicCreateEnabled();
+ }
+
+ @Override
+ public Optional<String> shareGroupDlqTopicPrefix() {
+ return groupConfigManager.shareGroupDlqTopicPrefix();
+ }
+
+ @Override
+ public boolean isDlqEnabledOnTopic(String topic) {
+ Properties props = metadataCache.topicConfig(topic);
+ if (props == null || props.isEmpty()) {
+ return false;
+ }
+ Object isEnabled =
props.get(TopicConfig.ERRORS_DEADLETTERQUEUE_GROUP_ENABLE_CONFIG);
+ if (isEnabled instanceof Boolean) {
+ return (boolean) isEnabled;
+ }
+ return false;
+ }
+
+ @Override
+ public boolean isShareGroupDlqCopyRecordEnabled(String groupId) {
+ Optional<GroupConfig> groupConfig =
groupConfigManager.groupConfig(groupId);
+ return
groupConfig.map(GroupConfig::errorsDLQCopyRecordEnable).orElse(false);
+ }
+
@Override
public Node getShareCoordinator(SharePartitionKey key, String
internalTopicName) {
try {
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 1646c8c6a83..681848ad0ef 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -730,7 +730,7 @@ class BrokerServer(
.newInstance(
new PersisterStateManager(
NetworkUtils.buildNetworkClient("Persister", config, metrics,
Time.SYSTEM, new LogContext(s"[Persister broker=${config.brokerId}]")),
- new ShareCoordinatorMetadataCacheHelperImpl(metadataCache, key
=> shareCoordinator.partitionFor(key), config.interBrokerListenerName),
+ new ShareCoordinatorMetadataCacheHelperImpl(metadataCache, key
=> shareCoordinator.partitionFor(key), config.interBrokerListenerName,
groupConfigManager),
Time.SYSTEM,
shareGroupTimer
)
diff --git
a/core/src/test/java/kafka/server/share/ShareCoordinatorMetadataCacheHelperImplTest.java
b/core/src/test/java/kafka/server/share/ShareCoordinatorMetadataCacheHelperImplTest.java
index f445e65f985..69e8e235dd2 100644
---
a/core/src/test/java/kafka/server/share/ShareCoordinatorMetadataCacheHelperImplTest.java
+++
b/core/src/test/java/kafka/server/share/ShareCoordinatorMetadataCacheHelperImplTest.java
@@ -19,11 +19,14 @@ package kafka.server.share;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.message.MetadataResponseData;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.coordinator.group.GroupConfig;
+import org.apache.kafka.coordinator.group.GroupConfigManager;
import org.apache.kafka.metadata.MetadataCache;
import org.apache.kafka.server.share.SharePartitionKey;
import
org.apache.kafka.server.share.persister.ShareCoordinatorMetadataCacheHelper;
@@ -32,6 +35,7 @@ import org.junit.jupiter.api.Test;
import java.util.List;
import java.util.Optional;
+import java.util.Properties;
import java.util.function.Function;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -53,23 +57,34 @@ public class ShareCoordinatorMetadataCacheHelperImplTest {
Exception e = assertThrows(NullPointerException.class, () -> new
ShareCoordinatorMetadataCacheHelperImpl(
null,
func,
- mock(ListenerName.class)
+ mock(ListenerName.class),
+ mock(GroupConfigManager.class)
));
assertEquals("metadataCache must not be null", e.getMessage());
e = assertThrows(NullPointerException.class, () -> new
ShareCoordinatorMetadataCacheHelperImpl(
mock(MetadataCache.class),
null,
- mock(ListenerName.class)
+ mock(ListenerName.class),
+ mock(GroupConfigManager.class)
));
assertEquals("keyToPartitionMapper must not be null", e.getMessage());
e = assertThrows(NullPointerException.class, () -> new
ShareCoordinatorMetadataCacheHelperImpl(
mock(MetadataCache.class),
func,
- null
+ null,
+ mock(GroupConfigManager.class)
));
assertEquals("interBrokerListenerName must not be null",
e.getMessage());
+
+ e = assertThrows(NullPointerException.class, () -> new
ShareCoordinatorMetadataCacheHelperImpl(
+ mock(MetadataCache.class),
+ func,
+ mock(ListenerName.class),
+ null
+ ));
+ assertEquals("groupConfigManager must not be null", e.getMessage());
}
@Test
@@ -82,7 +97,8 @@ public class ShareCoordinatorMetadataCacheHelperImplTest {
ShareCoordinatorMetadataCacheHelper cache = new
ShareCoordinatorMetadataCacheHelperImpl(
mockMetadataCache,
func,
- mock(ListenerName.class)
+ mock(ListenerName.class),
+ mock(GroupConfigManager.class)
);
when(mockMetadataCache.contains(eq(Topic.SHARE_GROUP_STATE_TOPIC_NAME)))
@@ -104,7 +120,8 @@ public class ShareCoordinatorMetadataCacheHelperImplTest {
ShareCoordinatorMetadataCacheHelper cache = new
ShareCoordinatorMetadataCacheHelperImpl(
mockMetadataCache,
func,
- mock(ListenerName.class)
+ mock(ListenerName.class),
+ mock(GroupConfigManager.class)
);
when(mockMetadataCache.contains(eq(Topic.SHARE_GROUP_STATE_TOPIC_NAME)))
@@ -126,7 +143,8 @@ public class ShareCoordinatorMetadataCacheHelperImplTest {
ShareCoordinatorMetadataCacheHelper cache = new
ShareCoordinatorMetadataCacheHelperImpl(
mockMetadataCache,
func,
- mock(ListenerName.class)
+ mock(ListenerName.class),
+ mock(GroupConfigManager.class)
);
assertEquals(
@@ -146,7 +164,8 @@ public class ShareCoordinatorMetadataCacheHelperImplTest {
ShareCoordinatorMetadataCacheHelper cache = new
ShareCoordinatorMetadataCacheHelperImpl(
mockMetadataCache,
func,
- mockListenerName
+ mockListenerName,
+ mock(GroupConfigManager.class)
);
when(mockMetadataCache.contains(eq(Topic.SHARE_GROUP_STATE_TOPIC_NAME)))
@@ -234,7 +253,8 @@ public class ShareCoordinatorMetadataCacheHelperImplTest {
ShareCoordinatorMetadataCacheHelper cache = new
ShareCoordinatorMetadataCacheHelperImpl(
mockMetadataCache,
func,
- mockListenerName
+ mockListenerName,
+ mock(GroupConfigManager.class)
);
when(mockMetadataCache.contains(eq(Topic.SHARE_GROUP_STATE_TOPIC_NAME)))
@@ -290,7 +310,8 @@ public class ShareCoordinatorMetadataCacheHelperImplTest {
ShareCoordinatorMetadataCacheHelper cache = new
ShareCoordinatorMetadataCacheHelperImpl(
mockMetadataCache,
func,
- mockListenerName
+ mockListenerName,
+ mock(GroupConfigManager.class)
);
when(mockMetadataCache.contains(eq(Topic.SHARE_GROUP_STATE_TOPIC_NAME)))
@@ -346,7 +367,8 @@ public class ShareCoordinatorMetadataCacheHelperImplTest {
ShareCoordinatorMetadataCacheHelper cache = new
ShareCoordinatorMetadataCacheHelperImpl(
mockMetadataCache,
func,
- mockListenerName
+ mockListenerName,
+ mock(GroupConfigManager.class)
);
when(mockMetadataCache.contains(eq(Topic.SHARE_GROUP_STATE_TOPIC_NAME)))
@@ -403,7 +425,8 @@ public class ShareCoordinatorMetadataCacheHelperImplTest {
ShareCoordinatorMetadataCacheHelper cache = new
ShareCoordinatorMetadataCacheHelperImpl(
mockMetadataCache,
func,
- mockListenerName
+ mockListenerName,
+ mock(GroupConfigManager.class)
);
when(mockMetadataCache.getAliveBrokerNodes(
@@ -429,7 +452,8 @@ public class ShareCoordinatorMetadataCacheHelperImplTest {
ShareCoordinatorMetadataCacheHelper cache = new
ShareCoordinatorMetadataCacheHelperImpl(
mockMetadataCache,
func,
- mockListenerName
+ mockListenerName,
+ mock(GroupConfigManager.class)
);
List<Node> nodes = List.of(
@@ -450,4 +474,246 @@ public class ShareCoordinatorMetadataCacheHelperImplTest {
verify(mockMetadataCache,
times(1)).getAliveBrokerNodes(eq(mockListenerName));
}
+
+ // Tests for shareGroupDlqTopic
+
+ @Test
+ public void testShareGroupDlqTopicReturnsEmptyWhenGroupConfigIsEmpty() {
+ GroupConfigManager mockGroupConfigManager =
mock(GroupConfigManager.class);
+
when(mockGroupConfigManager.groupConfig("test-group")).thenReturn(Optional.empty());
+
+ ShareCoordinatorMetadataCacheHelperImpl cache = new
ShareCoordinatorMetadataCacheHelperImpl(
+ mock(MetadataCache.class),
+ sharePartitionKey -> 0,
+ mock(ListenerName.class),
+ mockGroupConfigManager
+ );
+
+ assertEquals(Optional.empty(), cache.shareGroupDlqTopic("test-group"));
+ verify(mockGroupConfigManager, times(1)).groupConfig("test-group");
+ }
+
+ @Test
+ public void testShareGroupDlqTopicReturnsTopicName() {
+ GroupConfigManager mockGroupConfigManager =
mock(GroupConfigManager.class);
+ GroupConfig mockGroupConfig = mock(GroupConfig.class);
+ when(mockGroupConfig.errorsDLQTopicName()).thenReturn("dlq-topic");
+
when(mockGroupConfigManager.groupConfig("test-group")).thenReturn(Optional.of(mockGroupConfig));
+
+ ShareCoordinatorMetadataCacheHelperImpl cache = new
ShareCoordinatorMetadataCacheHelperImpl(
+ mock(MetadataCache.class),
+ sharePartitionKey -> 0,
+ mock(ListenerName.class),
+ mockGroupConfigManager
+ );
+
+ assertEquals(Optional.of("dlq-topic"),
cache.shareGroupDlqTopic("test-group"));
+ verify(mockGroupConfigManager, times(1)).groupConfig("test-group");
+ }
+
+ // Tests for isShareGroupDlqCopyRecordEnabled
+
+ @Test
+ public void
testIsShareGroupDlqCopyRecordEnabledReturnsFalseWhenGroupConfigEmpty() {
+ GroupConfigManager mockGroupConfigManager =
mock(GroupConfigManager.class);
+
when(mockGroupConfigManager.groupConfig("test-group")).thenReturn(Optional.empty());
+
+ ShareCoordinatorMetadataCacheHelperImpl cache = new
ShareCoordinatorMetadataCacheHelperImpl(
+ mock(MetadataCache.class),
+ sharePartitionKey -> 0,
+ mock(ListenerName.class),
+ mockGroupConfigManager
+ );
+
+ assertFalse(cache.isShareGroupDlqCopyRecordEnabled("test-group"));
+ verify(mockGroupConfigManager, times(1)).groupConfig("test-group");
+ }
+
+ @Test
+ public void testIsShareGroupDlqCopyRecordEnabledReturnsTrue() {
+ GroupConfigManager mockGroupConfigManager =
mock(GroupConfigManager.class);
+ GroupConfig mockGroupConfig = mock(GroupConfig.class);
+ when(mockGroupConfig.errorsDLQCopyRecordEnable()).thenReturn(true);
+
when(mockGroupConfigManager.groupConfig("test-group")).thenReturn(Optional.of(mockGroupConfig));
+
+ ShareCoordinatorMetadataCacheHelperImpl cache = new
ShareCoordinatorMetadataCacheHelperImpl(
+ mock(MetadataCache.class),
+ sharePartitionKey -> 0,
+ mock(ListenerName.class),
+ mockGroupConfigManager
+ );
+
+ assertTrue(cache.isShareGroupDlqCopyRecordEnabled("test-group"));
+ verify(mockGroupConfigManager, times(1)).groupConfig("test-group");
+ }
+
+ @Test
+ public void testIsShareGroupDlqCopyRecordEnabledReturnsFalse() {
+ GroupConfigManager mockGroupConfigManager =
mock(GroupConfigManager.class);
+ GroupConfig mockGroupConfig = mock(GroupConfig.class);
+ when(mockGroupConfig.errorsDLQCopyRecordEnable()).thenReturn(false);
+
when(mockGroupConfigManager.groupConfig("test-group")).thenReturn(Optional.of(mockGroupConfig));
+
+ ShareCoordinatorMetadataCacheHelperImpl cache = new
ShareCoordinatorMetadataCacheHelperImpl(
+ mock(MetadataCache.class),
+ sharePartitionKey -> 0,
+ mock(ListenerName.class),
+ mockGroupConfigManager
+ );
+
+ assertFalse(cache.isShareGroupDlqCopyRecordEnabled("test-group"));
+ verify(mockGroupConfigManager, times(1)).groupConfig("test-group");
+ }
+
+ // Tests for isDlqAutoTopicCreateEnabled
+
+ @Test
+ public void testIsDlqAutoTopicCreateEnabledReturnsTrue() {
+ GroupConfigManager mockGroupConfigManager =
mock(GroupConfigManager.class);
+
when(mockGroupConfigManager.isDlqAutoTopicCreateEnabled()).thenReturn(true);
+
+ ShareCoordinatorMetadataCacheHelperImpl cache = new
ShareCoordinatorMetadataCacheHelperImpl(
+ mock(MetadataCache.class),
+ sharePartitionKey -> 0,
+ mock(ListenerName.class),
+ mockGroupConfigManager
+ );
+
+ assertTrue(cache.isDlqAutoTopicCreateEnabled());
+ verify(mockGroupConfigManager, times(1)).isDlqAutoTopicCreateEnabled();
+ }
+
+ @Test
+ public void testIsDlqAutoTopicCreateEnabledReturnsFalse() {
+ GroupConfigManager mockGroupConfigManager =
mock(GroupConfigManager.class);
+
when(mockGroupConfigManager.isDlqAutoTopicCreateEnabled()).thenReturn(false);
+
+ ShareCoordinatorMetadataCacheHelperImpl cache = new
ShareCoordinatorMetadataCacheHelperImpl(
+ mock(MetadataCache.class),
+ sharePartitionKey -> 0,
+ mock(ListenerName.class),
+ mockGroupConfigManager
+ );
+
+ assertFalse(cache.isDlqAutoTopicCreateEnabled());
+ verify(mockGroupConfigManager, times(1)).isDlqAutoTopicCreateEnabled();
+ }
+
+ // Tests for shareGroupDlqTopicPrefix
+
+ @Test
+ public void testShareGroupDlqTopicPrefixReturnsPrefix() {
+ GroupConfigManager mockGroupConfigManager =
mock(GroupConfigManager.class);
+
when(mockGroupConfigManager.shareGroupDlqTopicPrefix()).thenReturn(Optional.of("dlq."));
+
+ ShareCoordinatorMetadataCacheHelperImpl cache = new
ShareCoordinatorMetadataCacheHelperImpl(
+ mock(MetadataCache.class),
+ sharePartitionKey -> 0,
+ mock(ListenerName.class),
+ mockGroupConfigManager
+ );
+
+ assertEquals(Optional.of("dlq."), cache.shareGroupDlqTopicPrefix());
+ verify(mockGroupConfigManager, times(1)).shareGroupDlqTopicPrefix();
+ }
+
+ @Test
+ public void testShareGroupDlqTopicPrefixReturnsEmpty() {
+ GroupConfigManager mockGroupConfigManager =
mock(GroupConfigManager.class);
+
when(mockGroupConfigManager.shareGroupDlqTopicPrefix()).thenReturn(Optional.empty());
+
+ ShareCoordinatorMetadataCacheHelperImpl cache = new
ShareCoordinatorMetadataCacheHelperImpl(
+ mock(MetadataCache.class),
+ sharePartitionKey -> 0,
+ mock(ListenerName.class),
+ mockGroupConfigManager
+ );
+
+ assertEquals(Optional.empty(), cache.shareGroupDlqTopicPrefix());
+ verify(mockGroupConfigManager, times(1)).shareGroupDlqTopicPrefix();
+ }
+
+ // Tests for isDlqEnabledOnTopic
+
+ @Test
+ public void testIsDlqEnabledOnTopicReturnsFalseWhenTopicConfigNull() {
+ MetadataCache mockMetadataCache = mock(MetadataCache.class);
+ when(mockMetadataCache.topicConfig("test-topic")).thenReturn(null);
+
+ ShareCoordinatorMetadataCacheHelperImpl cache = new
ShareCoordinatorMetadataCacheHelperImpl(
+ mockMetadataCache,
+ sharePartitionKey -> 0,
+ mock(ListenerName.class),
+ mock(GroupConfigManager.class)
+ );
+
+ assertFalse(cache.isDlqEnabledOnTopic("test-topic"));
+ verify(mockMetadataCache, times(1)).topicConfig("test-topic");
+ }
+
+ @Test
+ public void testIsDlqEnabledOnTopicReturnsFalseWhenTopicConfigEmpty() {
+ MetadataCache mockMetadataCache = mock(MetadataCache.class);
+ when(mockMetadataCache.topicConfig("test-topic")).thenReturn(new
Properties());
+
+ ShareCoordinatorMetadataCacheHelperImpl cache = new
ShareCoordinatorMetadataCacheHelperImpl(
+ mockMetadataCache,
+ sharePartitionKey -> 0,
+ mock(ListenerName.class),
+ mock(GroupConfigManager.class)
+ );
+
+ assertFalse(cache.isDlqEnabledOnTopic("test-topic"));
+ }
+
+ @Test
+ public void testIsDlqEnabledOnTopicReturnsFalseWhenConfigValueNotBoolean()
{
+ MetadataCache mockMetadataCache = mock(MetadataCache.class);
+ Properties props = new Properties();
+ props.put(TopicConfig.ERRORS_DEADLETTERQUEUE_GROUP_ENABLE_CONFIG,
"true");
+ when(mockMetadataCache.topicConfig("test-topic")).thenReturn(props);
+
+ ShareCoordinatorMetadataCacheHelperImpl cache = new
ShareCoordinatorMetadataCacheHelperImpl(
+ mockMetadataCache,
+ sharePartitionKey -> 0,
+ mock(ListenerName.class),
+ mock(GroupConfigManager.class)
+ );
+
+ assertFalse(cache.isDlqEnabledOnTopic("test-topic"));
+ }
+
+ @Test
+ public void testIsDlqEnabledOnTopicReturnsTrue() {
+ MetadataCache mockMetadataCache = mock(MetadataCache.class);
+ Properties props = new Properties();
+ props.put(TopicConfig.ERRORS_DEADLETTERQUEUE_GROUP_ENABLE_CONFIG,
true);
+ when(mockMetadataCache.topicConfig("test-topic")).thenReturn(props);
+
+ ShareCoordinatorMetadataCacheHelperImpl cache = new
ShareCoordinatorMetadataCacheHelperImpl(
+ mockMetadataCache,
+ sharePartitionKey -> 0,
+ mock(ListenerName.class),
+ mock(GroupConfigManager.class)
+ );
+
+ assertTrue(cache.isDlqEnabledOnTopic("test-topic"));
+ }
+
+ @Test
+ public void testIsDlqEnabledOnTopicReturnsFalseValue() {
+ MetadataCache mockMetadataCache = mock(MetadataCache.class);
+ Properties props = new Properties();
+ props.put(TopicConfig.ERRORS_DEADLETTERQUEUE_GROUP_ENABLE_CONFIG,
false);
+ when(mockMetadataCache.topicConfig("test-topic")).thenReturn(props);
+
+ ShareCoordinatorMetadataCacheHelperImpl cache = new
ShareCoordinatorMetadataCacheHelperImpl(
+ mockMetadataCache,
+ sharePartitionKey -> 0,
+ mock(ListenerName.class),
+ mock(GroupConfigManager.class)
+ );
+
+ assertFalse(cache.isDlqEnabledOnTopic("test-topic"));
+ }
}
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfigManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfigManager.java
index 310d079205c..9335b9ea91e 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfigManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfigManager.java
@@ -90,6 +90,14 @@ public class GroupConfigManager implements AutoCloseable {
return List.copyOf(configMap.keySet());
}
+ public Optional<String> shareGroupDlqTopicPrefix() {
+ return
Optional.ofNullable(groupCoordinatorConfig.errorsDLQTopicNamePrefix());
+ }
+
+ public boolean isDlqAutoTopicCreateEnabled() {
+ return groupCoordinatorConfig.errorsDLQAutoCreateTopicsEnable();
+ }
+
/**
* Remove all group configs.
*/
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigManagerTest.java
index 062b7f861b8..9e842de6c7a 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigManagerTest.java
@@ -130,6 +130,56 @@ public class GroupConfigManagerTest {
assertTrue(configManager.groupIds().contains(groupId2));
}
+ // Tests for isDlqAutoTopicCreateEnabled
+
+ @Test
+ public void testIsDlqAutoTopicCreateEnabledDefault() {
+ assertFalse(configManager.isDlqAutoTopicCreateEnabled());
+ }
+
+ @Test
+ public void testIsDlqAutoTopicCreateEnabledTrue() {
+ Map<String, Object> overrides = new HashMap<>();
+
overrides.put(GroupCoordinatorConfig.ERRORS_DEADLETTERQUEUE_AUTO_CREATE_TOPICS_ENABLE_CONFIG,
true);
+ configManager = createConfigManager(overrides);
+
+ assertTrue(configManager.isDlqAutoTopicCreateEnabled());
+ }
+
+ @Test
+ public void testIsDlqAutoTopicCreateEnabledFalse() {
+ Map<String, Object> overrides = new HashMap<>();
+
overrides.put(GroupCoordinatorConfig.ERRORS_DEADLETTERQUEUE_AUTO_CREATE_TOPICS_ENABLE_CONFIG,
false);
+ configManager = createConfigManager(overrides);
+
+ assertFalse(configManager.isDlqAutoTopicCreateEnabled());
+ }
+
+ // Tests for shareGroupDlqTopicPrefix
+
+ @Test
+ public void testShareGroupDlqTopicPrefixDefault() {
+ assertEquals(Optional.of("dlq."),
configManager.shareGroupDlqTopicPrefix());
+ }
+
+ @Test
+ public void testShareGroupDlqTopicPrefixCustom() {
+ Map<String, Object> overrides = new HashMap<>();
+
overrides.put(GroupCoordinatorConfig.ERRORS_DEADLETTERQUEUE_TOPIC_NAME_PREFIX_CONFIG,
"custom-dlq-");
+ configManager = createConfigManager(overrides);
+
+ assertEquals(Optional.of("custom-dlq-"),
configManager.shareGroupDlqTopicPrefix());
+ }
+
+ @Test
+ public void testShareGroupDlqTopicPrefixEmpty() {
+ Map<String, Object> overrides = new HashMap<>();
+
overrides.put(GroupCoordinatorConfig.ERRORS_DEADLETTERQUEUE_TOPIC_NAME_PREFIX_CONFIG,
"");
+ configManager = createConfigManager(overrides);
+
+ assertEquals(Optional.of(""),
configManager.shareGroupDlqTopicPrefix());
+ }
+
public static GroupConfigManager createConfigManager() {
return createConfigManager(new HashMap<>());
}
diff --git
a/server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQMetadataCacheHelper.java
b/server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQMetadataCacheHelper.java
new file mode 100644
index 00000000000..e670676831f
--- /dev/null
+++
b/server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQMetadataCacheHelper.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.share.dlq;
+
+import org.apache.kafka.common.Node;
+
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * Interface encapsulating metadata cache related methods which are
+ * required by share group DLQ operations. This will be helpful in testing
+ * and keeping implementations manageable.
+ */
+public interface ShareGroupDLQMetadataCacheHelper {
+ /**
+ * Return optional of string representing of DLQ topic.
+ *
+ * @param groupId Id of the share group
+ * @return Optional of string representing of DLQ topic if set, empty
otherwise
+ */
+ Optional<String> shareGroupDlqTopic(String groupId);
+
+ /**
+ * Check if DLQ dynamic config is set on the topic to mark it available
for DLQ writes.
+ *
+ * @param topic The name of the topic
+ * @return Boolean which is true when DLQ is set on the topic, false
otherwise
+ */
+ boolean isDlqEnabledOnTopic(String topic);
+
+ /**
+ * Check if the cluster config to auto create DLQ topics is enabled.
+ *
+ * @return Boolean which is true when DLQ topic auto create cluster config
is set, false otherwise
+ */
+ boolean isDlqAutoTopicCreateEnabled();
+
+ /**
+ * Return optional of string representing the configured DLQ prefix.
+ *
+ * @return Optional of string representing DLQ prefix if configured,
empty otherwise
+ */
+ Optional<String> shareGroupDlqTopicPrefix();
+
+ /**
+ * Check is copy record data into DLQ topic is enabled.
+ *
+ * @param groupId The id of the share group
+ * @return Boolean which is true if config is set, false otherwise
+ */
+ boolean isShareGroupDlqCopyRecordEnabled(String groupId);
+
+ /**
+ * Check if a topic is present in the metadata cache.
+ *
+ * @param topic The name of the topic
+ * @return Boolean which is true is topic exists, false otherwise
+ */
+ boolean containsTopic(String topic);
+
+ /**
+ * Get all nodes in the kafka cluster encapsulated in the {@link Node}
object.
+ *
+ * @return List of nodes representing the cluster nodes
+ */
+ List<Node> getClusterNodes();
+}
diff --git
a/server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManager.java
b/server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManager.java
index 771c18720a5..b55996b55a0 100644
---
a/server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManager.java
+++
b/server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManager.java
@@ -21,16 +21,24 @@ import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.RequestCompletionHandler;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.message.CreateTopicsRequestData;
+import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.CreateTopicsRequest;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
-import
org.apache.kafka.server.share.persister.ShareCoordinatorMetadataCacheHelper;
import org.apache.kafka.server.util.InterBrokerSendThread;
import org.apache.kafka.server.util.RequestAndCompletionHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.util.Collection;
import java.util.List;
+import java.util.Optional;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -47,14 +55,9 @@ public class ShareGroupDLQStateManager {
private final SendThread sender;
private final Time time;
private final Timer timer;
- private final ShareCoordinatorMetadataCacheHelper cacheHelper;
-
- public enum RPCType {
- PRODUCE,
- CREATE_TOPIC
- }
+ private final ShareGroupDLQMetadataCacheHelper cacheHelper;
- public ShareGroupDLQStateManager(KafkaClient client,
ShareCoordinatorMetadataCacheHelper cacheHelper, Time time, Timer timer) {
+ public ShareGroupDLQStateManager(KafkaClient client,
ShareGroupDLQMetadataCacheHelper cacheHelper, Time time, Timer timer) {
if (client == null) {
throw new IllegalArgumentException("Kafkaclient must not be
null.");
}
@@ -105,21 +108,80 @@ public class ShareGroupDLQStateManager {
* @return A future completing normally on successful DLQ, exceptionally
otherwise.
*/
public CompletableFuture<Void> dlq(ShareGroupDLQRecordParameter param) {
- ProduceRequestHandler requestHandler = new
ProduceRequestHandler(param);
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ ProduceRequestHandler requestHandler = new
ProduceRequestHandler(param, future);
sender.enqueue(requestHandler);
- return requestHandler.result().thenAccept(response -> {
- });
+ return future;
}
private abstract class ShareGroupDLQStateManagerHandler implements
RequestCompletionHandler {
+ private final ShareGroupDLQRecordParameter param;
+
+ ShareGroupDLQStateManagerHandler(ShareGroupDLQRecordParameter param) {
+ this.param = param;
+ }
+
protected abstract AbstractRequest.Builder<? extends AbstractRequest>
requestBuilder();
protected abstract CompletableFuture<? extends AbstractResponse>
result();
+
+ protected abstract String name();
+
+ protected abstract void createTopicErrorResponse(Exception exception);
+
+ protected AbstractRequest.Builder<CreateTopicsRequest>
createTopicBuilder() {
+ return new CreateTopicsRequest.Builder(new
CreateTopicsRequestData());
+ }
+
+ public Optional<Throwable> validateDlqTopic() {
+ Optional<String> topicNameOpt =
cacheHelper.shareGroupDlqTopic(param.groupId());
+ Optional<String> topicPrefix =
cacheHelper.shareGroupDlqTopicPrefix();
+
+ // Verify that DLQ topic for the share group is set and is
correctly named.
+ if (topicNameOpt.isEmpty()) {
+ return Optional.of(new
ConfigException(String.format("Configured DLQ topic name in share group: %s is
empty.", param.groupId())));
+ } else if (!topicNameOpt.get().startsWith("__")) {
+ return Optional.of(new
ConfigException(String.format("Configured DLQ topic name in share group: %s
cannot start with __, topic: %s.", param.groupId(), topicNameOpt.get())));
+ }
+
+ String topicName = topicNameOpt.get();
+
+ // Verify that DLQ is enabled on a correctly named topic,
configured on a share group.
+ if (cacheHelper.containsTopic(topicName) &&
!cacheHelper.isDlqEnabledOnTopic(topicName)) {
+ return Optional.of(new ConfigException(String.format("DLQ is
not enabled on configured DLQ topic for share group: %s, topic: %s.",
param.groupId(), topicName)));
+ }
+
+ // Verify that for a non-existent correctly named DLQ topic, auto
create should be enabled.
+ if (!cacheHelper.containsTopic(topicName) &&
!cacheHelper.isDlqAutoTopicCreateEnabled()) {
+ return Optional.of(new ConfigException(String.format("DLQ
topic does not exist and auto create is disabled on cluster for share group:
%s, topic: %s.", param.groupId(), topicName)));
+ }
+
+ // Verify that if configured, the DLQ topic name prefix aligns
with the topic name.
+ return topicPrefix.map(prefix -> {
+ if (!prefix.isEmpty() && !topicName.startsWith(prefix)) {
+ return new ConfigException(String.format("Configured DLQ
topic name does not comply with the DLQ topic prefix in share group: %s, topic:
%s, prefix: %s.", param.groupId(), topicName, prefix));
+ }
+ return null;
+ });
+ }
+
+ public ShareGroupDLQRecordParameter recordParam() {
+ return param;
+ }
+
+ public boolean dlqTopicExists() {
+ Optional<String> shareGroupDlqTopic =
cacheHelper.shareGroupDlqTopic(param.groupId());
+ return
shareGroupDlqTopic.filter(cacheHelper::containsTopic).isPresent();
+ }
}
private class ProduceRequestHandler extends
ShareGroupDLQStateManagerHandler {
+ private final CompletableFuture<Void> result;
+ private static final Logger LOG =
LoggerFactory.getLogger(ShareGroupDLQStateManager.ProduceRequestHandler.class);
- ProduceRequestHandler(ShareGroupDLQRecordParameter param) {
+ public ProduceRequestHandler(ShareGroupDLQRecordParameter param,
CompletableFuture<Void> result) {
+ super(param);
+ this.result = result;
}
@Override
@@ -132,13 +194,23 @@ public class ShareGroupDLQStateManager {
return CompletableFuture.completedFuture(null);
}
+ @Override
+ protected String name() {
+ return "ProduceRequestHandler";
+ }
+
+ @Override
+ protected void createTopicErrorResponse(Exception exception) {
+ this.result.completeExceptionally(exception);
+ }
+
@Override
public void onComplete(ClientResponse response) {
}
}
- private static class SendThread extends InterBrokerSendThread {
+ private class SendThread extends InterBrokerSendThread {
private final
ConcurrentLinkedQueue<ShareGroupDLQStateManager.ShareGroupDLQStateManagerHandler>
queue = new ConcurrentLinkedQueue<>();
private final Random random;
@@ -149,12 +221,48 @@ public class ShareGroupDLQStateManager {
@Override
public Collection<RequestAndCompletionHandler> generateRequests() {
+ if (!queue.isEmpty()) {
+ ShareGroupDLQStateManager.ShareGroupDLQStateManagerHandler
handler = queue.poll();
+ // At this point either a correctly named and configured DLQ
topic exists or
+ // one is configured but does non-exist. We have already
validated that the
+ // auto create should be enabled, in that case.
+ if (!handler.dlqTopicExists()) {
+ // We need to send RPC to create the topic
+ Node randomNode = randomNode();
+ if (randomNode == Node.noNode()) {
+ log.error("Unable to find node to use for coordinator
lookup.");
+ // fatal failure, cannot retry or progress
+ // fail the RPC
+
handler.createTopicErrorResponse(Errors.BROKER_NOT_AVAILABLE.exception());
+ return List.of();
+ }
+ return List.of(new RequestAndCompletionHandler(
+ time.milliseconds(),
+ randomNode,
+ handler.createTopicBuilder(),
+ handler
+ ));
+ }
+ }
return List.of();
}
public void
enqueue(ShareGroupDLQStateManager.ShareGroupDLQStateManagerHandler handler) {
+ Optional<Throwable> exp = handler.validateDlqTopic();
+ if (exp.isPresent()) {
+ handler.result().completeExceptionally(exp.get());
+ return;
+ }
queue.add(handler);
wakeup();
}
+
+ private Node randomNode() {
+ List<Node> nodes = cacheHelper.getClusterNodes();
+ if (nodes == null || nodes.isEmpty()) {
+ return Node.noNode();
+ }
+ return nodes.get(random.nextInt(nodes.size()));
+ }
}
}