This is an automated email from the ASF dual-hosted git repository.
AndrewJSchofield pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new bbd81e8d000 KAFKA-20549: Share group DLQ manager implementation.
Skeleton classes [1/N] (#22209)
bbd81e8d000 is described below
commit bbd81e8d000cea9e6a663f8b4a0d656880a82721
Author: Sushant Mahajan <[email protected]>
AuthorDate: Wed May 6 01:09:47 2026 +0530
KAFKA-20549: Share group DLQ manager implementation. Skeleton classes [1/N]
(#22209)
* Added default share group dlq manager impl
`DefaultShareGroupDLQManager`.
* Added base impl for share group dlq state manager
`ShareGroupDLQStateManager`
* Extended `ShareGroupDLQManager` interface to add method `void stop()`
for
cleanup.
* The classes are standalone at the moment and not plugged into the rest
of the code.
Reviewers: Andrew Schofield <[email protected]>
---
checkstyle/import-control-server-common.xml | 5 +
.../java/kafka/server/share/SharePartition.java | 8 +-
.../share/dlq/DefaultShareGroupDLQManager.java | 108 ++++++++++++++
.../server/share/dlq/NoOpShareGroupDLQManager.java | 9 +-
...hareGroupDLQ.java => ShareGroupDLQManager.java} | 9 +-
.../share/dlq/ShareGroupDLQRecordParameter.java | 2 +-
.../share/dlq/ShareGroupDLQStateManager.java | 160 +++++++++++++++++++++
7 files changed, 292 insertions(+), 9 deletions(-)
diff --git a/checkstyle/import-control-server-common.xml
b/checkstyle/import-control-server-common.xml
index 92da434568e..fd44a4a3f4b 100644
--- a/checkstyle/import-control-server-common.xml
+++ b/checkstyle/import-control-server-common.xml
@@ -113,6 +113,11 @@
<allow pkg="org.apache.kafka.server.util" />
<allow pkg="org.apache.kafka.test" />
</subpackage>
+ <subpackage name="dlq">
+ <allow pkg="org.apache.kafka.clients" />
+ <allow pkg="org.apache.kafka.server.util" />
+ <allow pkg="org.apache.kafka.test" />
+ </subpackage>
</subpackage>
<subpackage name="util">
diff --git a/core/src/main/java/kafka/server/share/SharePartition.java
b/core/src/main/java/kafka/server/share/SharePartition.java
index 4044c94cbcc..7e86a4477a6 100644
--- a/core/src/main/java/kafka/server/share/SharePartition.java
+++ b/core/src/main/java/kafka/server/share/SharePartition.java
@@ -43,7 +43,7 @@ import
org.apache.kafka.coordinator.group.ShareGroupAutoOffsetResetStrategy;
import
org.apache.kafka.coordinator.group.modern.share.ShareGroupConfigProvider;
import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch;
import org.apache.kafka.server.share.dlq.NoOpShareGroupDLQManager;
-import org.apache.kafka.server.share.dlq.ShareGroupDLQ;
+import org.apache.kafka.server.share.dlq.ShareGroupDLQManager;
import org.apache.kafka.server.share.dlq.ShareGroupDLQRecordParameter;
import org.apache.kafka.server.share.fetch.AcquisitionLockTimeoutHandler;
import org.apache.kafka.server.share.fetch.AcquisitionLockTimerTask;
@@ -332,7 +332,7 @@ public class SharePartition {
/**
* Reference to the dlq manager implementation.
*/
- private final ShareGroupDLQ shareGroupDLQ = new NoOpShareGroupDLQManager();
+ private final ShareGroupDLQManager shareGroupDLQ = new
NoOpShareGroupDLQManager();
/**
* Supplier to toggle dlq support.
@@ -2335,7 +2335,7 @@ public class SharePartition {
// mapping between bytes and record state type. All ack
types have been added except for RENEW which
// has been handled above.
RecordState recordState = recordStateWithDlq(ackType);
- Throwable dlqCause = recordState == RecordState.ARCHIVING
? ShareGroupDLQ.CLIENT_REJECT : null;
+ Throwable dlqCause = recordState == RecordState.ARCHIVING
? ShareGroupDLQManager.CLIENT_REJECT : null;
if (recordState == null) {
return Optional.of(new
IllegalArgumentException("Unknown acknowledge type id: " + ackType));
}
@@ -2416,7 +2416,7 @@ public class SharePartition {
// either released or moved to a state where member id existence
is not important. The member id
// is only important when the batch is acquired.
RecordState recordState = recordStateWithDlq(ackType);
- Throwable dlqCause = recordState == RecordState.ARCHIVING ?
ShareGroupDLQ.CLIENT_REJECT : null;
+ Throwable dlqCause = recordState == RecordState.ARCHIVING ?
ShareGroupDLQManager.CLIENT_REJECT : null;
if (recordState == null) {
return Optional.of(new IllegalArgumentException("Unknown
acknowledge type id: " + ackType));
}
diff --git
a/server-common/src/main/java/org/apache/kafka/server/share/dlq/DefaultShareGroupDLQManager.java
b/server-common/src/main/java/org/apache/kafka/server/share/dlq/DefaultShareGroupDLQManager.java
new file mode 100644
index 00000000000..64361aa0564
--- /dev/null
+++
b/server-common/src/main/java/org/apache/kafka/server/share/dlq/DefaultShareGroupDLQManager.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.share.dlq;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * The default share group DLQ manager responsible for processing
+ * incoming messages and writing them to the appropriate dlq topic.
+ */
+public class DefaultShareGroupDLQManager implements ShareGroupDLQManager {
+ /**
+ * Reference to state manager responsible for actually sending
+ * the relevant RPCs and writing records.
+ */
+ private final ShareGroupDLQStateManager stateManager;
+
+ private static final Logger log =
LoggerFactory.getLogger(DefaultShareGroupDLQManager.class);
+
+ public DefaultShareGroupDLQManager instance(ShareGroupDLQStateManager
stateManager) {
+ DefaultShareGroupDLQManager instance = new
DefaultShareGroupDLQManager(stateManager);
+ instance.start();
+ return instance;
+ }
+
+ private DefaultShareGroupDLQManager(ShareGroupDLQStateManager
stateManager) {
+ this.stateManager = stateManager;
+ }
+
+ private void start() {
+ this.stateManager.start();
+ }
+
+ @Override
+ public CompletableFuture<Void> enqueue(ShareGroupDLQRecordParameter param)
{
+ try {
+ validate(param);
+ } catch (Exception e) {
+ log.error("Unable to validate dlq record parameters", e);
+ return CompletableFuture.failedFuture(e);
+ }
+ return stateManager.dlq(param);
+ }
+
+ @Override
+ public void stop() {
+ try {
+ if (stateManager != null) {
+ stateManager.stop();
+ }
+ } catch (Exception e) {
+ log.error("Unable to stop DLQ state manager", e);
+ }
+ }
+
+ private static void validate(ShareGroupDLQRecordParameter param) {
+ String prefix = "DLQ records parameters";
+ if (param == null) {
+ throw new IllegalArgumentException(prefix + " cannot be null.");
+ }
+
+ if (param.groupId() == null || param.groupId().isEmpty()) {
+ throw new IllegalArgumentException(prefix + " group cannot be null
or empty.");
+ }
+
+ if (param.topicIdPartition() == null) {
+ throw new IllegalArgumentException(prefix + " topic/partition data
cannot be null or empty.");
+ }
+
+ if (param.topicIdPartition().topicId() == null) {
+ throw new IllegalArgumentException(prefix + " topic id data cannot
be null or empty.");
+ }
+
+ if (param.topicIdPartition().partition() < 0) {
+ throw new IllegalArgumentException(prefix + " partition cannot be
negative.");
+ }
+
+ if (param.lastOffset() < param.firstOffset()) {
+ throw new IllegalArgumentException(prefix + " last offset cannot
be less than first offset.");
+ }
+
+ if (param.firstOffset() < 0) {
+ throw new IllegalArgumentException(prefix + " first offset cannot
be negative.");
+ }
+
+ if (param.lastOffset() < 0) {
+ throw new IllegalArgumentException(prefix + " last offset cannot
be negative.");
+ }
+ }
+}
diff --git
a/server-common/src/main/java/org/apache/kafka/server/share/dlq/NoOpShareGroupDLQManager.java
b/server-common/src/main/java/org/apache/kafka/server/share/dlq/NoOpShareGroupDLQManager.java
index cd7b971b47b..ea4f5367d1b 100644
---
a/server-common/src/main/java/org/apache/kafka/server/share/dlq/NoOpShareGroupDLQManager.java
+++
b/server-common/src/main/java/org/apache/kafka/server/share/dlq/NoOpShareGroupDLQManager.java
@@ -23,11 +23,11 @@ import org.slf4j.LoggerFactory;
import java.util.concurrent.CompletableFuture;
/**
- * A no op implementation of {@link ShareGroupDLQ}. This will be useful
+ * A no op implementation of {@link ShareGroupDLQManager}. This will be useful
* in development cycle and testing. All methods return immediately with
* a successfully completed future.
*/
-public class NoOpShareGroupDLQManager implements ShareGroupDLQ {
+public class NoOpShareGroupDLQManager implements ShareGroupDLQManager {
private static final Logger log =
LoggerFactory.getLogger(NoOpShareGroupDLQManager.class);
@Override
@@ -35,4 +35,9 @@ public class NoOpShareGroupDLQManager implements
ShareGroupDLQ {
log.trace("Enqueuing share group dlq record parameter: {}", param);
return CompletableFuture.completedFuture(null);
}
+
+ @Override
+ public void stop() {
+ // noop
+ }
}
diff --git
a/server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQ.java
b/server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQManager.java
similarity index 88%
rename from
server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQ.java
rename to
server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQManager.java
index 96bf848076c..f320b316009 100644
---
a/server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQ.java
+++
b/server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQManager.java
@@ -22,7 +22,7 @@ import java.util.concurrent.CompletableFuture;
/**
* The main interface to identify implementations of dead letter queues for
share groups.
*/
-public interface ShareGroupDLQ {
+public interface ShareGroupDLQManager {
class ShareGroupDLQThrowable extends Throwable {
ShareGroupDLQThrowable(String message) {
// We don't want the stack trace to be filled.
@@ -34,11 +34,16 @@ public interface ShareGroupDLQ {
Throwable DELIVERY_COUNT_EXCEEDED = new ShareGroupDLQThrowable("Offset
delivery count exceeded the threshold.");
/**
- * Main method exposed to the world to enqueuing a record to the share
groups dead letter queue.
+ * Main method exposed to the world to enqueue a record to the share
groups dead letter queue.
*
* @param param A java record encapsulating required and optional
information about the kafka record
* being dead letter queued.
* @return A completable future of Void type, mainly to signal exceptions.
*/
CompletableFuture<Void> enqueue(ShareGroupDLQRecordParameter param);
+
+ /**
+ * Perform cleanup and interrupt any threads.
+ */
+ void stop();
}
diff --git
a/server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQRecordParameter.java
b/server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQRecordParameter.java
index 465279d1229..2e7c728542d 100644
---
a/server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQRecordParameter.java
+++
b/server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQRecordParameter.java
@@ -22,7 +22,7 @@ import org.apache.kafka.common.TopicIdPartition;
import java.util.Optional;
/**
- * Record representing information needed from callers of {@link
ShareGroupDLQ#enqueue}. Inclusion
+ * Record representing information needed from callers of {@link
ShareGroupDLQManager#enqueue}. Inclusion
* of first and last offset allows passing batch information as well.
*
* @param groupId The share group id of the message being recorded.
diff --git
a/server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManager.java
b/server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManager.java
new file mode 100644
index 00000000000..771c18720a5
--- /dev/null
+++
b/server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManager.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.share.dlq;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.RequestCompletionHandler;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import
org.apache.kafka.server.share.persister.ShareCoordinatorMetadataCacheHelper;
+import org.apache.kafka.server.util.InterBrokerSendThread;
+import org.apache.kafka.server.util.RequestAndCompletionHandler;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Core implementation of RPC send logic for the dlq manager.
+ * This class allows for enqueuing records meant to be DLQ'ed
+ * and manages various RPC which are to be sent to the KafkaApis.
+ * These RPCs include PRODUCE, CREATE_TOPIC.
+ */
+public class ShareGroupDLQStateManager {
+ private final AtomicBoolean isStarted = new AtomicBoolean(false);
+ private final SendThread sender;
+ private final Time time;
+ private final Timer timer;
+ private final ShareCoordinatorMetadataCacheHelper cacheHelper;
+
+ public enum RPCType {
+ PRODUCE,
+ CREATE_TOPIC
+ }
+
+ public ShareGroupDLQStateManager(KafkaClient client,
ShareCoordinatorMetadataCacheHelper cacheHelper, Time time, Timer timer) {
+ if (client == null) {
+ throw new IllegalArgumentException("Kafkaclient must not be
null.");
+ }
+
+ if (cacheHelper == null) {
+ throw new IllegalArgumentException("Cache helper must not be
null.");
+ }
+
+ if (time == null) {
+ throw new IllegalArgumentException("Time must not be null.");
+ }
+
+ if (timer == null) {
+ throw new IllegalArgumentException("Timer must not be null.");
+ }
+
+ this.time = time;
+ this.timer = timer;
+ this.cacheHelper = cacheHelper;
+ this.sender = new SendThread(
+ "ShareGroupDLQSendThread",
+ client,
+
Math.toIntExact(CommonClientConfigs.DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS),
//30 seconds
+ this.time,
+ true,
+ new Random(this.time.milliseconds())
+ );
+ }
+
+ public void start() {
+ if (isStarted.compareAndSet(false, true)) {
+ this.sender.start();
+ isStarted.set(true);
+ }
+ }
+
+ public void stop() throws Exception {
+ if (isStarted.compareAndSet(true, false)) {
+ this.sender.shutdown();
+ }
+ }
+
+ /**
+ * Enqueues a {@link ShareGroupDLQRecordParameter} based on which records
will be DLQ'ed.
+ * The actual record written to the DLQ topic will be built by fetching
information from this argument.
+ *
+ * @param param Reference comprising offset information
+ * @return A future completing normally on successful DLQ, exceptionally
otherwise.
+ */
+ public CompletableFuture<Void> dlq(ShareGroupDLQRecordParameter param) {
+ ProduceRequestHandler requestHandler = new
ProduceRequestHandler(param);
+ sender.enqueue(requestHandler);
+ return requestHandler.result().thenAccept(response -> {
+ });
+ }
+
+ private abstract class ShareGroupDLQStateManagerHandler implements
RequestCompletionHandler {
+ protected abstract AbstractRequest.Builder<? extends AbstractRequest>
requestBuilder();
+
+ protected abstract CompletableFuture<? extends AbstractResponse>
result();
+ }
+
+ private class ProduceRequestHandler extends
ShareGroupDLQStateManagerHandler {
+
+ ProduceRequestHandler(ShareGroupDLQRecordParameter param) {
+ }
+
+ @Override
+ protected AbstractRequest.Builder<? extends AbstractRequest>
requestBuilder() {
+ return null;
+ }
+
+ @Override
+ protected CompletableFuture<? extends AbstractResponse> result() {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public void onComplete(ClientResponse response) {
+
+ }
+ }
+
+ private static class SendThread extends InterBrokerSendThread {
+ private final
ConcurrentLinkedQueue<ShareGroupDLQStateManager.ShareGroupDLQStateManagerHandler>
queue = new ConcurrentLinkedQueue<>();
+ private final Random random;
+
+ SendThread(String name, KafkaClient client, int requestTimeoutMs, Time
time, boolean isInterruptible, Random random) {
+ super(name, client, requestTimeoutMs, time, isInterruptible);
+ this.random = random;
+ }
+
+ @Override
+ public Collection<RequestAndCompletionHandler> generateRequests() {
+ return List.of();
+ }
+
+ public void
enqueue(ShareGroupDLQStateManager.ShareGroupDLQStateManagerHandler handler) {
+ queue.add(handler);
+ wakeup();
+ }
+ }
+}