This is an automated email from the ASF dual-hosted git repository.

AndrewJSchofield pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f2f31100c4b KAFKA-20611: Stitch share group DLQ manager with the main 
code. (#22372)
f2f31100c4b is described below

commit f2f31100c4b7aef96a86e77d5ecf47ef7f9f3468
Author: Sushant Mahajan <[email protected]>
AuthorDate: Fri May 29 03:44:48 2026 +0530

    KAFKA-20611: Stitch share group DLQ manager with the main code. (#22372)
    
    * Add ShareGroupDLQManager instance creation code in BrokerServer and
    pass along the instance to SharePartitionManager to be handed over to
    SharePartition.
    
    NOTE: Merge after https://github.com/apache/kafka/pull/22368 Reviewers:
    Apoorv Mittal <[email protected]>
    
    Reviewers: Apoorv Mittal <[email protected]>, Andrew Schofield
     <[email protected]>
---
 .../java/kafka/server/share/SharePartition.java    | 22 +++++++------
 .../kafka/server/share/SharePartitionManager.java  | 26 ++++++++++++----
 .../src/main/scala/kafka/server/BrokerServer.scala | 36 +++++++++++++++++++++-
 .../server/share/SharePartitionManagerTest.java    | 11 ++++++-
 .../kafka/server/share/SharePartitionTest.java     | 12 ++++++--
 .../scala/unit/kafka/server/KafkaConfigTest.scala  |  1 +
 .../group/modern/share/ShareGroupConfig.java       | 14 ++++++++-
 7 files changed, 102 insertions(+), 20 deletions(-)

diff --git a/core/src/main/java/kafka/server/share/SharePartition.java 
b/core/src/main/java/kafka/server/share/SharePartition.java
index 02d0cb14413..bfa209b4819 100644
--- a/core/src/main/java/kafka/server/share/SharePartition.java
+++ b/core/src/main/java/kafka/server/share/SharePartition.java
@@ -42,7 +42,6 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.coordinator.group.ShareGroupAutoOffsetResetStrategy;
 import 
org.apache.kafka.coordinator.group.modern.share.ShareGroupConfigProvider;
 import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch;
-import org.apache.kafka.server.share.dlq.NoOpShareGroupDLQManager;
 import org.apache.kafka.server.share.dlq.ShareGroupDLQManager;
 import org.apache.kafka.server.share.dlq.ShareGroupDLQRecordParameter;
 import org.apache.kafka.server.share.fetch.AcquisitionLockTimeoutHandler;
@@ -330,15 +329,16 @@ public class SharePartition {
     private long fetchLockIdleDurationMs;
 
     /**
-     * Reference to the dlq manager implementation.
+     * Supplier to toggle DLQ support.
      */
-    private final ShareGroupDLQManager shareGroupDLQ = new 
NoOpShareGroupDLQManager();
+    private final Supplier<Boolean> shareGroupDlqEnableSupplier;
 
     /**
-     * Supplier to toggle dlq support.
+     * Reference to the DLQ manager implementation.
      */
-    private final Supplier<Boolean> shareGroupDlqEnableSupplier;
+    private final ShareGroupDLQManager shareGroupDLQManager;
 
+    @SuppressWarnings("ParameterNumber")
     SharePartition(
         String groupId,
         TopicIdPartition topicIdPartition,
@@ -352,11 +352,13 @@ public class SharePartition {
         ReplicaManager replicaManager,
         ShareGroupConfigProvider configProvider,
         SharePartitionListener listener,
-        Supplier<Boolean> shareGroupDlqEnableSupplier
+        Supplier<Boolean> shareGroupDlqEnableSupplier,
+        ShareGroupDLQManager shareGroupDLQManager
     ) {
         this(groupId, topicIdPartition, leaderEpoch, 
defaultMaxInFlightRecords, defaultMaxDeliveryCount, defaultRecordLockDurationMs,
             timer, time, persister, replicaManager, configProvider, 
SharePartitionState.EMPTY, listener,
-            new SharePartitionMetrics(groupId, topicIdPartition.topic(), 
topicIdPartition.partition()), shareGroupDlqEnableSupplier);
+            new SharePartitionMetrics(groupId, topicIdPartition.topic(), 
topicIdPartition.partition()), shareGroupDlqEnableSupplier,
+            shareGroupDLQManager);
     }
 
     // Visible for testing
@@ -376,7 +378,8 @@ public class SharePartition {
         SharePartitionState sharePartitionState,
         SharePartitionListener listener,
         SharePartitionMetrics sharePartitionMetrics,
-        Supplier<Boolean> shareGroupDlqEnableSupplier
+        Supplier<Boolean> shareGroupDlqEnableSupplier,
+        ShareGroupDLQManager shareGroupDLQManager
     ) {
         this.groupId = groupId;
         this.topicIdPartition = topicIdPartition;
@@ -403,6 +406,7 @@ public class SharePartition {
         this.registerGaugeMetrics();
         this.deliveryCompleteCount = new AtomicInteger(0);
         this.shareGroupDlqEnableSupplier = shareGroupDlqEnableSupplier;
+        this.shareGroupDLQManager = shareGroupDLQManager;
     }
 
     /**
@@ -3337,7 +3341,7 @@ public class SharePartition {
     void initiateDLQAndArchive(InFlightState updatedState, long firstOffset,
                                long lastOffset, short deliveryCount, Throwable 
dlqCause) {
         // Step 1: Enqueue to DLQ
-        shareGroupDLQ.enqueue(new ShareGroupDLQRecordParameter(
+        shareGroupDLQManager.enqueue(new ShareGroupDLQRecordParameter(
             groupId, topicIdPartition, firstOffset, lastOffset,
             Optional.of(deliveryCount), Optional.ofNullable(dlqCause), false
         )).whenComplete((v1, dlqException) -> {
diff --git a/core/src/main/java/kafka/server/share/SharePartitionManager.java 
b/core/src/main/java/kafka/server/share/SharePartitionManager.java
index 3b4db2162ee..531f5278bc5 100644
--- a/core/src/main/java/kafka/server/share/SharePartitionManager.java
+++ b/core/src/main/java/kafka/server/share/SharePartitionManager.java
@@ -41,6 +41,7 @@ import 
org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch;
 import org.apache.kafka.server.share.context.FinalContext;
 import org.apache.kafka.server.share.context.ShareFetchContext;
 import org.apache.kafka.server.share.context.ShareSessionContext;
+import org.apache.kafka.server.share.dlq.ShareGroupDLQManager;
 import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey;
 import org.apache.kafka.server.share.fetch.DelayedShareFetchKey;
 import org.apache.kafka.server.share.fetch.DelayedShareFetchPartitionKey;
@@ -154,6 +155,11 @@ public class SharePartitionManager implements 
AutoCloseable {
      */
     private final Supplier<Boolean> shareGroupDlqEnableSupplier;
 
+    /**
+     * Reference to the DLQ manager implementation.
+     */
+    private final ShareGroupDLQManager shareGroupDLQManager;
+
     public SharePartitionManager(
         ReplicaManager replicaManager,
         Time time,
@@ -165,7 +171,8 @@ public class SharePartitionManager implements AutoCloseable 
{
         Persister persister,
         ShareGroupConfigProvider configProvider,
         BrokerTopicStats brokerTopicStats,
-        Supplier<Boolean> shareGroupDlqEnableSupplier
+        Supplier<Boolean> shareGroupDlqEnableSupplier,
+        ShareGroupDLQManager shareGroupDLQManager
     ) {
         this(replicaManager,
             time,
@@ -179,10 +186,12 @@ public class SharePartitionManager implements 
AutoCloseable {
             configProvider,
             new ShareGroupMetrics(time),
             brokerTopicStats,
-            shareGroupDlqEnableSupplier
+            shareGroupDlqEnableSupplier,
+            shareGroupDLQManager
         );
     }
 
+    @SuppressWarnings("ParameterNumber")
     private SharePartitionManager(
         ReplicaManager replicaManager,
         Time time,
@@ -196,7 +205,8 @@ public class SharePartitionManager implements AutoCloseable 
{
         ShareGroupConfigProvider configProvider,
         ShareGroupMetrics shareGroupMetrics,
         BrokerTopicStats brokerTopicStats,
-        Supplier<Boolean> shareGroupDlqEnableSupplier
+        Supplier<Boolean> shareGroupDlqEnableSupplier,
+        ShareGroupDLQManager shareGroupDLQManager
     ) {
         this(replicaManager,
             time,
@@ -212,7 +222,8 @@ public class SharePartitionManager implements AutoCloseable 
{
             configProvider,
             shareGroupMetrics,
             brokerTopicStats,
-            shareGroupDlqEnableSupplier
+            shareGroupDlqEnableSupplier,
+            shareGroupDLQManager
         );
     }
 
@@ -232,7 +243,8 @@ public class SharePartitionManager implements AutoCloseable 
{
             ShareGroupConfigProvider configProvider,
             ShareGroupMetrics shareGroupMetrics,
             BrokerTopicStats brokerTopicStats,
-            Supplier<Boolean> shareGroupDlqEnableSupplier
+            Supplier<Boolean> shareGroupDlqEnableSupplier,
+            ShareGroupDLQManager shareGroupDLQManager
     ) {
         this.replicaManager = replicaManager;
         this.time = time;
@@ -249,6 +261,7 @@ public class SharePartitionManager implements AutoCloseable 
{
         this.brokerTopicStats = brokerTopicStats;
         this.cache.registerShareGroupListener(new ShareGroupListenerImpl());
         this.shareGroupDlqEnableSupplier = shareGroupDlqEnableSupplier;
+        this.shareGroupDLQManager = shareGroupDLQManager;
     }
 
     /**
@@ -733,7 +746,8 @@ public class SharePartitionManager implements AutoCloseable 
{
                             replicaManager,
                             configProvider,
                             listener,
-                            shareGroupDlqEnableSupplier
+                            shareGroupDlqEnableSupplier,
+                            shareGroupDLQManager
                     );
                 });
     }
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala 
b/core/src/main/scala/kafka/server/BrokerServer.scala
index f7a9db7a724..04a1ee7672c 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -62,6 +62,7 @@ import 
org.apache.kafka.server.transaction.AddPartitionsToTxnManager
 import org.apache.kafka.storage.internals.log.{LogDirFailureChannel, 
LogManager => JLogManager}
 import org.apache.kafka.storage.log.metrics.BrokerTopicStats
 import org.apache.kafka.server.partition.{AlterPartitionManager, 
DefaultAlterPartitionManager}
+import org.apache.kafka.server.share.dlq.{DefaultShareGroupDLQManager, 
NoOpShareGroupDLQManager, ShareGroupDLQManager}
 
 import java.time.Duration
 import java.util
@@ -170,6 +171,8 @@ class BrokerServer(
 
   private var shareGroupTimer: Timer = _
 
+  private var shareGroupDLQManager: ShareGroupDLQManager = _
+
   private def maybeChangeStatus(from: ProcessStatus, to: ProcessStatus): 
Boolean = {
     lock.lock()
     try {
@@ -386,6 +389,9 @@ class BrokerServer(
       /* create persister */
       persister = createShareStatePersister()
 
+      /* create share group DLQ manager */
+      shareGroupDLQManager = createShareGroupDLQManager()
+
       partitionMetadataClient = createPartitionMetadataClient(metadataCache)
 
       groupCoordinator = createGroupCoordinator()
@@ -463,7 +469,8 @@ class BrokerServer(
         persister,
         new ShareGroupConfigProvider(groupConfigManager),
         brokerTopicStats,
-        () => 
ShareVersion.fromFeatureLevel(metadataCache.features.finalizedFeatures.getOrDefault(ShareVersion.FEATURE_NAME,
 0.toShort)).supportsShareGroupDLQ()
+        () => 
ShareVersion.fromFeatureLevel(metadataCache.features.finalizedFeatures.getOrDefault(ShareVersion.FEATURE_NAME,
 0.toShort)).supportsShareGroupDLQ(),
+        shareGroupDLQManager
       )
 
       dataPlaneRequestProcessor = new KafkaApis(
@@ -750,6 +757,30 @@ class BrokerServer(
     }
   }
 
+  private def createShareGroupDLQManager(): ShareGroupDLQManager = {
+    if (config.shareGroupConfig.shareGroupDLQManagerClassName.nonEmpty) {
+      val klass = 
Utils.loadClass(config.shareGroupConfig.shareGroupDLQManagerClassName, 
classOf[Object]).asInstanceOf[Class[ShareGroupDLQManager]]
+      if (klass.getName.equals(classOf[DefaultShareGroupDLQManager].getName)) {
+        DefaultShareGroupDLQManager.instance(
+          NetworkUtils.buildNetworkClient("ShareGroupDLQManager", config, 
metrics, Time.SYSTEM, new LogContext(s"[ShareGroupDLQManager 
broker=${config.brokerId}]")),
+          new ShareCoordinatorMetadataCacheHelperImpl(metadataCache, key => 
shareCoordinator.partitionFor(key), config.interBrokerListenerName, 
groupConfigManager),
+          Time.SYSTEM,
+          shareGroupTimer
+        )
+      } else if 
(klass.getName.equals(classOf[NoOpShareGroupDLQManager].getName)) {
+        info("Using no-op share group DLQ manager")
+        new NoOpShareGroupDLQManager()
+      } else {
+        error("Unknown share group DLQ manager specialization specified. 
ShareGroupDLQManager is only factory-pluggable!")
+        throw new IllegalArgumentException("Unknown share group DLQ manager 
specified " + config.shareGroupConfig.shareGroupDLQManagerClassName)
+      }
+    } else {
+      // in case share group DLQ manager class name deliberately empty (key=)
+      info("Using no-op share group DLQ manager")
+      new NoOpShareGroupDLQManager()
+    }
+  }
+
   protected def createRemoteLogManager(listenerInfo: ListenerInfo): 
Option[RemoteLogManager] = {
     if (config.remoteLogManagerConfig.isRemoteStorageSystemEnabled) {
       val listenerName = 
config.remoteLogManagerConfig.remoteLogMetadataManagerListenerName()
@@ -883,6 +914,9 @@ class BrokerServer(
       if (persister != null)
         Utils.swallow(this.logger.underlying, () => persister.stop())
 
+      if (shareGroupDLQManager != null)
+        Utils.swallow(this.logger.underlying, () => 
shareGroupDLQManager.stop())
+
       Utils.closeQuietly(shareGroupTimer, "share group timer")
 
       if (lifecycleManager != null)
diff --git 
a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java 
b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
index 45d29fd79b3..d1a0d98eb65 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
@@ -64,6 +64,8 @@ import 
org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch;
 import org.apache.kafka.server.share.context.FinalContext;
 import org.apache.kafka.server.share.context.ShareFetchContext;
 import org.apache.kafka.server.share.context.ShareSessionContext;
+import org.apache.kafka.server.share.dlq.NoOpShareGroupDLQManager;
+import org.apache.kafka.server.share.dlq.ShareGroupDLQManager;
 import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey;
 import org.apache.kafka.server.share.fetch.DelayedShareFetchKey;
 import org.apache.kafka.server.share.fetch.PartitionMaxBytesStrategy;
@@ -3249,6 +3251,7 @@ public class SharePartitionManagerTest {
         private ShareGroupMetrics shareGroupMetrics = new 
ShareGroupMetrics(time);
         private BrokerTopicStats brokerTopicStats;
         private Supplier<Boolean> shareGroupDlqEnableSupplier = () -> false;
+        private ShareGroupDLQManager shareGroupDLQManager = new 
NoOpShareGroupDLQManager();
 
         private SharePartitionManagerBuilder withReplicaManager(ReplicaManager 
replicaManager) {
             this.replicaManager = replicaManager;
@@ -3290,6 +3293,11 @@ public class SharePartitionManagerTest {
             return this;
         }
 
+        private SharePartitionManagerBuilder 
withShareGroupDlqManager(ShareGroupDLQManager shareGroupDLQManager) {
+            this.shareGroupDLQManager = shareGroupDLQManager;
+            return this;
+        }
+
         public static SharePartitionManagerBuilder builder() {
             return new SharePartitionManagerBuilder();
         }
@@ -3308,7 +3316,8 @@ public class SharePartitionManagerTest {
                 new ShareGroupConfigProvider(mock(GroupConfigManager.class)),
                 shareGroupMetrics,
                 brokerTopicStats,
-                shareGroupDlqEnableSupplier
+                shareGroupDlqEnableSupplier,
+                shareGroupDLQManager
             );
         }
     }
diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java 
b/core/src/test/java/kafka/server/share/SharePartitionTest.java
index 06d09556492..61480cd73bb 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java
@@ -56,6 +56,7 @@ import org.apache.kafka.coordinator.group.GroupConfigManager;
 import org.apache.kafka.coordinator.group.ShareGroupAutoOffsetResetStrategy;
 import 
org.apache.kafka.coordinator.group.modern.share.ShareGroupConfigProvider;
 import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch;
+import org.apache.kafka.server.share.dlq.NoOpShareGroupDLQManager;
 import org.apache.kafka.server.share.dlq.ShareGroupDLQManager;
 import org.apache.kafka.server.share.fetch.AcquisitionLockTimerTask;
 import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey;
@@ -13318,6 +13319,7 @@ public class SharePartitionTest {
         private Time time = MOCK_TIME;
         private SharePartitionMetrics sharePartitionMetrics = 
Mockito.mock(SharePartitionMetrics.class);
         private Supplier<Boolean> shareGroupDlqEnableSupplier = () -> false;
+        private ShareGroupDLQManager shareGroupDLQManager = new 
NoOpShareGroupDLQManager();
 
         private SharePartitionBuilder withMaxInflightRecords(int 
defaultMaxInflightRecords) {
             this.defaultMaxInflightRecords = defaultMaxInflightRecords;
@@ -13369,14 +13371,20 @@ public class SharePartitionTest {
             return this;
         }
 
+        private SharePartitionBuilder 
withShareGroupDlqManager(ShareGroupDLQManager shareGroupDLQManager) {
+            this.shareGroupDLQManager = shareGroupDLQManager;
+            return this;
+        }
+
         public static SharePartitionBuilder builder() {
             return new SharePartitionBuilder();
         }
 
         public SharePartition build() {
             return new SharePartition(GROUP_ID, TOPIC_ID_PARTITION, 0, 
defaultMaxInflightRecords, defaultMaxDeliveryCount,
-                    defaultAcquisitionLockTimeoutMs, mockTimer, time, 
persister, replicaManager, configProvider,
-                    state, Mockito.mock(SharePartitionListener.class), 
sharePartitionMetrics, shareGroupDlqEnableSupplier);
+                defaultAcquisitionLockTimeoutMs, mockTimer, time, persister, 
replicaManager, configProvider,
+                state, Mockito.mock(SharePartitionListener.class), 
sharePartitionMetrics, shareGroupDlqEnableSupplier,
+                shareGroupDLQManager);
         }
     }
 }
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 2c726371853..a409f8f7cb1 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -1092,6 +1092,7 @@ class KafkaConfigTest {
         case 
GroupCoordinatorConfig.ERRORS_DEADLETTERQUEUE_TOPIC_NAME_PREFIX_CONFIG => // 
ignore string
         case GroupConfig.ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG => // ignore 
string
         case GroupConfig.ERRORS_DEADLETTERQUEUE_COPY_RECORD_ENABLE_CONFIG => 
assertPropertyInvalid(baseProperties, name, "not_a_boolean")
+        case ShareGroupConfig.SHARE_GROUP_DLQ_MANAGER_CLASS_NAME_CONFIG => 
//ignore string
 
         /** Streams groups configs */
         case GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG => 
assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfig.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfig.java
index cc72a8b48e3..2ce1cf1f81f 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfig.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfig.java
@@ -82,6 +82,11 @@ public class ShareGroupConfig {
     public static final String SHARE_GROUP_PERSISTER_CLASS_NAME_DOC = "The 
fully qualified name of a class which implements " +
         "the <code>org.apache.kafka.server.share.Persister</code> interface.";
 
+    public static final String SHARE_GROUP_DLQ_MANAGER_CLASS_NAME_CONFIG = 
"group.share.dlq.manager.class.name";
+    public static final String SHARE_GROUP_DLQ_MANAGER_CLASS_NAME_DEFAULT = 
"org.apache.kafka.server.share.dlq.DefaultShareGroupDLQManager";
+    public static final String SHARE_GROUP_DLQ_MANAGER_CLASS_NAME_DOC = "The 
fully qualified name of a class which implements " +
+        "the 
<code>org.apache.kafka.server.share.dlq.ShareGroupDLQManager</code> interface.";
+
     public static final ConfigDef CONFIG_DEF = new ConfigDef()
             .define(SHARE_GROUP_DELIVERY_COUNT_LIMIT_CONFIG, INT, 
SHARE_GROUP_DELIVERY_COUNT_LIMIT_DEFAULT, between(2, 10), MEDIUM, 
SHARE_GROUP_DELIVERY_COUNT_LIMIT_DOC)
             .define(SHARE_GROUP_MAX_DELIVERY_COUNT_LIMIT_CONFIG, INT, 
SHARE_GROUP_MAX_DELIVERY_COUNT_LIMIT_DEFAULT, between(5, 25), MEDIUM, 
SHARE_GROUP_MAX_DELIVERY_COUNT_LIMIT_DOC)
@@ -94,7 +99,8 @@ public class ShareGroupConfig {
             .define(SHARE_GROUP_MIN_PARTITION_MAX_RECORD_LOCKS_CONFIG, INT, 
SHARE_GROUP_MIN_PARTITION_MAX_RECORD_LOCKS_DEFAULT, between(100, 2000), MEDIUM, 
SHARE_GROUP_MIN_PARTITION_MAX_RECORD_LOCKS_DOC)
             .define(SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_CONFIG, INT, 
SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_DEFAULT, MEDIUM, 
SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_DOC)
             .define(SHARE_GROUP_MAX_SHARE_SESSIONS_CONFIG, INT, 
SHARE_GROUP_MAX_SHARE_SESSIONS_DEFAULT, atLeast(1), MEDIUM, 
SHARE_GROUP_MAX_SHARE_SESSIONS_DOC)
-            .defineInternal(SHARE_GROUP_PERSISTER_CLASS_NAME_CONFIG, STRING, 
SHARE_GROUP_PERSISTER_CLASS_NAME_DEFAULT, null, MEDIUM, 
SHARE_GROUP_PERSISTER_CLASS_NAME_DOC);
+            .defineInternal(SHARE_GROUP_PERSISTER_CLASS_NAME_CONFIG, STRING, 
SHARE_GROUP_PERSISTER_CLASS_NAME_DEFAULT, null, MEDIUM, 
SHARE_GROUP_PERSISTER_CLASS_NAME_DOC)
+            .defineInternal(SHARE_GROUP_DLQ_MANAGER_CLASS_NAME_CONFIG, STRING, 
SHARE_GROUP_DLQ_MANAGER_CLASS_NAME_DEFAULT, null, MEDIUM, 
SHARE_GROUP_DLQ_MANAGER_CLASS_NAME_DOC);
 
     private final int shareGroupPartitionMaxRecordLocks;
     private final int shareGroupMaxPartitionMaxRecordLocks;
@@ -108,6 +114,7 @@ public class ShareGroupConfig {
     private final int shareFetchPurgatoryPurgeIntervalRequests;
     private final int shareGroupMaxShareSessions;
     private final String shareGroupPersisterClassName;
+    private final String shareGroupDLQManagerClassName;
     private final AbstractConfig config;
 
     public ShareGroupConfig(AbstractConfig config) {
@@ -124,6 +131,7 @@ public class ShareGroupConfig {
         shareFetchPurgatoryPurgeIntervalRequests = 
config.getInt(ShareGroupConfig.SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_CONFIG);
         shareGroupMaxShareSessions = 
config.getInt(ShareGroupConfig.SHARE_GROUP_MAX_SHARE_SESSIONS_CONFIG);
         shareGroupPersisterClassName = 
config.getString(ShareGroupConfig.SHARE_GROUP_PERSISTER_CLASS_NAME_CONFIG);
+        shareGroupDLQManagerClassName = 
config.getString(ShareGroupConfig.SHARE_GROUP_DLQ_MANAGER_CLASS_NAME_CONFIG);
         validate();
     }
 
@@ -186,6 +194,10 @@ public class ShareGroupConfig {
         return shareGroupPersisterClassName;
     }
 
+    public String shareGroupDLQManagerClassName() {
+        return shareGroupDLQManagerClassName;
+    }
+
     private void validate() {
         Utils.require(shareGroupMaxDeliveryCountLimit >= 
shareGroupDeliveryCountLimit,
                 String.format("%s must be greater than or equal to %s",

Reply via email to