This is an automated email from the ASF dual-hosted git repository.

AndrewJSchofield pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b64498ac571 KAFKA-20549: Add logic for create topic in DLQ state 
manager. [3/N] (#22284)
b64498ac571 is described below

commit b64498ac571a8a25533c366936772978e73be418
Author: Sushant Mahajan <[email protected]>
AuthorDate: Fri May 15 17:44:36 2026 +0530

    KAFKA-20549: Add logic for create topic in DLQ state manager. [3/N] (#22284)
    
    * The `ShareGroupDLQStateManager` is responsible for sending various
    RPCs to support the DLQ operation.
    * In this PR, we have added code to auto create the DLQ topic, if
    appropriate configs are valid.
    * Tests will be added when PRODUCE logic is integrated as well. The
    component is not yet stitched with `SharePartition`.
    * Minor change: changed factory method in `DefaultShareGroupDLQManager`
    to static.
    
    Reviewers: Andrew Schofield <[email protected]>
---
 .../share/dlq/DefaultShareGroupDLQManager.java     |   2 +-
 .../share/dlq/ShareGroupDLQStateManager.java       | 262 ++++++++++++++++-----
 2 files changed, 205 insertions(+), 59 deletions(-)

diff --git 
a/server-common/src/main/java/org/apache/kafka/server/share/dlq/DefaultShareGroupDLQManager.java
 
b/server-common/src/main/java/org/apache/kafka/server/share/dlq/DefaultShareGroupDLQManager.java
index 64361aa0564..c06af86d014 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/share/dlq/DefaultShareGroupDLQManager.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/share/dlq/DefaultShareGroupDLQManager.java
@@ -35,7 +35,7 @@ public class DefaultShareGroupDLQManager implements 
ShareGroupDLQManager {
 
     private static final Logger log = 
LoggerFactory.getLogger(DefaultShareGroupDLQManager.class);
 
-    public DefaultShareGroupDLQManager instance(ShareGroupDLQStateManager 
stateManager) {
+    public static ShareGroupDLQManager instance(ShareGroupDLQStateManager 
stateManager) {
         DefaultShareGroupDLQManager instance = new 
DefaultShareGroupDLQManager(stateManager);
         instance.start();
         return instance;
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManager.java
 
b/server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManager.java
index b55996b55a0..1bcfd1f025d 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManager.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManager.java
@@ -23,15 +23,20 @@ import org.apache.kafka.clients.KafkaClient;
 import org.apache.kafka.clients.RequestCompletionHandler;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.config.TopicConfig;
 import org.apache.kafka.common.message.CreateTopicsRequestData;
+import org.apache.kafka.common.message.CreateTopicsResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.AbstractRequest;
-import org.apache.kafka.common.requests.AbstractResponse;
 import org.apache.kafka.common.requests.CreateTopicsRequest;
+import org.apache.kafka.common.requests.CreateTopicsResponse;
 import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.common.utils.Timer;
+import org.apache.kafka.common.utils.internals.ExponentialBackoffManager;
 import org.apache.kafka.server.util.InterBrokerSendThread;
 import org.apache.kafka.server.util.RequestAndCompletionHandler;
+import org.apache.kafka.server.util.timer.Timer;
+import org.apache.kafka.server.util.timer.TimerTask;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -56,6 +61,12 @@ public class ShareGroupDLQStateManager {
     private final Time time;
     private final Timer timer;
     private final ShareGroupDLQMetadataCacheHelper cacheHelper;
+    public static final long REQUEST_BACKOFF_MS = 1_000L;
+    public static final long REQUEST_BACKOFF_MAX_MS = 30_000L;
+    private static final int MAX_REQUEST_ATTEMPTS = 5;
+    private static final int RETRY_BACKOFF_EXP_BASE = 
CommonClientConfigs.RETRY_BACKOFF_EXP_BASE;
+    private static final double RETRY_BACKOFF_JITTER = 
CommonClientConfigs.RETRY_BACKOFF_JITTER;
+    private static final Logger log = 
LoggerFactory.getLogger(ShareGroupDLQStateManager.class);
 
     public ShareGroupDLQStateManager(KafkaClient client, 
ShareGroupDLQMetadataCacheHelper cacheHelper, Time time, Timer timer) {
         if (client == null) {
@@ -89,6 +100,7 @@ public class ShareGroupDLQStateManager {
 
     public void start() {
         if (isStarted.compareAndSet(false, true)) {
+            log.info("Starting ShareGroupDLQStateManager");
             this.sender.start();
             isStarted.set(true);
         }
@@ -109,28 +121,87 @@ public class ShareGroupDLQStateManager {
      */
     public CompletableFuture<Void> dlq(ShareGroupDLQRecordParameter param) {
         CompletableFuture<Void> future = new CompletableFuture<>();
-        ProduceRequestHandler requestHandler = new 
ProduceRequestHandler(param, future);
-        sender.enqueue(requestHandler);
+        ProduceRequestHandler requestHandler = new 
ProduceRequestHandler(param, future, REQUEST_BACKOFF_MS, 
REQUEST_BACKOFF_MAX_MS, MAX_REQUEST_ATTEMPTS);
+        enqueue(requestHandler);
         return future;
     }
 
-    private abstract class ShareGroupDLQStateManagerHandler implements 
RequestCompletionHandler {
-        private final ShareGroupDLQRecordParameter param;
+    private void enqueue(ProduceRequestHandler requestHandler) {
+        sender.enqueue(requestHandler);
+    }
 
-        ShareGroupDLQStateManagerHandler(ShareGroupDLQRecordParameter param) {
+    private class ProduceRequestHandler implements RequestCompletionHandler {
+        private final CompletableFuture<Void> result;
+        private final ShareGroupDLQRecordParameter param;
+        private static final Logger LOG = 
LoggerFactory.getLogger(ShareGroupDLQStateManager.ProduceRequestHandler.class);
+        private final ExponentialBackoffManager createTopicsBackoff;
+
+        public ProduceRequestHandler(
+            ShareGroupDLQRecordParameter param,
+            CompletableFuture<Void> result,
+            long backoffMs,
+            long backoffMaxMs,
+            int maxRPCRetryAttempts
+        ) {
             this.param = param;
+            this.result = result;
+            this.createTopicsBackoff = new ExponentialBackoffManager(
+                maxRPCRetryAttempts,
+                backoffMs,
+                RETRY_BACKOFF_EXP_BASE,
+                backoffMaxMs,
+                RETRY_BACKOFF_JITTER
+            );
         }
 
-        protected abstract AbstractRequest.Builder<? extends AbstractRequest> 
requestBuilder();
+        @Override
+        public void onComplete(ClientResponse response) {
+            // We don't know if FIND_COORD or actual REQUEST. Let's err on 
side of request.
+            if (response == null) {
+                
result.completeExceptionally(Errors.UNKNOWN_SERVER_ERROR.exception());
+                sender.wakeup();
+                return;
+            }
 
-        protected abstract CompletableFuture<? extends AbstractResponse> 
result();
+            if (response.requestHeader().apiKey() == ApiKeys.CREATE_TOPICS) {
+                handleCreateTopicsResponse(response);
+            } else {
+                // handle the response
+            }
 
-        protected abstract String name();
+            sender.wakeup();
+        }
 
-        protected abstract void createTopicErrorResponse(Exception exception);
+        public String name() {
+            return "ProduceRequestHandler";
+        }
 
-        protected AbstractRequest.Builder<CreateTopicsRequest> 
createTopicBuilder() {
-            return new CreateTopicsRequest.Builder(new 
CreateTopicsRequestData());
+        public void requestErrorResponse(Throwable exception) {
+            this.result.completeExceptionally(exception);
+        }
+
+        public AbstractRequest.Builder<CreateTopicsRequest> 
createTopicBuilder() throws ConfigException {
+            // Since the configs are dynamic - something might have changed, 
so revalidate.
+            Optional<String> dlqTopic = 
cacheHelper.shareGroupDlqTopic(param.groupId());
+            if (dlqTopic.isEmpty()) {
+                throw new ConfigException(String.format("DLQ topic is not 
configured for share group %s.", param.groupId()));
+            }
+
+            CreateTopicsRequestData.CreatableTopicConfigCollection 
topicConfigs = new CreateTopicsRequestData.CreatableTopicConfigCollection();
+            CreateTopicsRequestData.CreatableTopicConfig enableDLQConfig = new 
CreateTopicsRequestData.CreatableTopicConfig()
+                
.setName(TopicConfig.ERRORS_DEADLETTERQUEUE_GROUP_ENABLE_CONFIG)
+                .setValue("true");
+            topicConfigs.add(enableDLQConfig);
+
+            CreateTopicsRequestData.CreatableTopicCollection topicCollection = 
new CreateTopicsRequestData.CreatableTopicCollection();
+            topicCollection.add(new CreateTopicsRequestData.CreatableTopic()
+                .setName(dlqTopic.get())
+                .setReplicationFactor((short) -1)
+                .setNumPartitions((short) -1)
+                .setConfigs(topicConfigs));
+
+            return new CreateTopicsRequest.Builder(new 
CreateTopicsRequestData()
+                .setTopics(topicCollection));
         }
 
         public Optional<Throwable> validateDlqTopic() {
@@ -140,7 +211,7 @@ public class ShareGroupDLQStateManager {
             // Verify that DLQ topic for the share group is set and is 
correctly named.
             if (topicNameOpt.isEmpty()) {
                 return Optional.of(new 
ConfigException(String.format("Configured DLQ topic name in share group: %s is 
empty.", param.groupId())));
-            } else if (!topicNameOpt.get().startsWith("__")) {
+            } else if (topicNameOpt.get().startsWith("__")) {
                 return Optional.of(new 
ConfigException(String.format("Configured DLQ topic name in share group: %s 
cannot start with __, topic: %s.", param.groupId(), topicNameOpt.get())));
             }
 
@@ -165,53 +236,106 @@ public class ShareGroupDLQStateManager {
             });
         }
 
-        public ShareGroupDLQRecordParameter recordParam() {
-            return param;
-        }
-
         public boolean dlqTopicExists() {
             Optional<String> shareGroupDlqTopic = 
cacheHelper.shareGroupDlqTopic(param.groupId());
             return 
shareGroupDlqTopic.filter(cacheHelper::containsTopic).isPresent();
         }
-    }
 
-    private class ProduceRequestHandler extends 
ShareGroupDLQStateManagerHandler {
-        private final CompletableFuture<Void> result;
-        private static final Logger LOG = 
LoggerFactory.getLogger(ShareGroupDLQStateManager.ProduceRequestHandler.class);
-
-        public ProduceRequestHandler(ShareGroupDLQRecordParameter param, 
CompletableFuture<Void> result) {
-            super(param);
-            this.result = result;
-        }
-
-        @Override
-        protected AbstractRequest.Builder<? extends AbstractRequest> 
requestBuilder() {
-            return null;
-        }
-
-        @Override
-        protected CompletableFuture<? extends AbstractResponse> result() {
-            return CompletableFuture.completedFuture(null);
-        }
+        // Visibility for testing
+        Optional<Errors> checkResponseError(ClientResponse response) {
+            if (response.hasResponse()) {
+                return Optional.empty();
+            }
 
-        @Override
-        protected String name() {
-            return "ProduceRequestHandler";
+            String dlqTopicName = 
cacheHelper.shareGroupDlqTopic(param.groupId()).orElse("UNKNOWN");
+
+            LOG.debug("Response for RPC {} with DLQ topic {} is invalid - {}", 
name(), dlqTopicName, response);
+
+            if (response.authenticationException() != null) {
+                LOG.error("Authentication exception", 
response.authenticationException());
+                Errors error = 
Errors.forException(response.authenticationException());
+                return Optional.of(error);
+            } else if (response.versionMismatch() != null) {
+                LOG.error("Version mismatch exception", 
response.versionMismatch());
+                Errors error = Errors.forException(response.versionMismatch());
+                return Optional.of(error);
+            } else if (response.wasDisconnected()) {    // Retriable
+                return Optional.of(Errors.NETWORK_EXCEPTION);
+            } else if (response.wasTimedOut()) {    // Retriable
+                LOG.debug("Response for RPC {} with DLQ topic {} timed out - 
{}.", name(), dlqTopicName, response);
+                return Optional.of(Errors.REQUEST_TIMED_OUT);
+            } else {
+                return Optional.of(Errors.UNKNOWN_SERVER_ERROR);
+            }
         }
 
-        @Override
-        protected void createTopicErrorResponse(Exception exception) {
-            this.result.completeExceptionally(exception);
-        }
+        private void handleCreateTopicsResponse(ClientResponse response) {
+            LOG.debug("Received CreateTopicsResponse {}", response);
+            createTopicsBackoff.incrementAttempt();
+            Errors clientResponseError = 
checkResponseError(response).orElse(Errors.NONE);
+            String clientResponseErrorMessage = clientResponseError.message();
+            String dlqTopicName = 
cacheHelper.shareGroupDlqTopic(param.groupId()).orElse("UNKNOWN");
+
+            switch (clientResponseError) {
+                case NONE:
+                    // Topic has been created
+                    CreateTopicsResponse createTopicsResponse = 
((CreateTopicsResponse) response.responseBody());
+                    Optional<CreateTopicsResponseData.CreatableTopicResult> 
topicResultOpt = createTopicsResponse.data().topics().stream().findFirst();
+                    if (topicResultOpt.isEmpty()) {
+                        LOG.error("DLQ topic not found in create topic 
response {}.", dlqTopicName);
+                        
requestErrorResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION.exception());
+                        break;
+                    }
 
-        @Override
-        public void onComplete(ClientResponse response) {
+                    CreateTopicsResponseData.CreatableTopicResult topicResult 
= topicResultOpt.get();
+                    Errors error = Errors.forCode(topicResult.errorCode());
+                    String errorMessage = topicResult.errorMessage();
+                    switch (error) {
+                        case NONE:
+                            // Replace with enqueue post PRODUCE implementation
+                            this.result.complete(null);
+                            break;
+
+                        case TOPIC_ALREADY_EXISTS:
+                            // When topic creation request was sent, it could 
be that it a previous request
+                            // was in-flight. As such this request might get 
TOPIC_ALREADY_EXISTS error, which is acceptable
+                            // let it try again and sender logic will take 
care of it.
+                        case THROTTLING_QUOTA_EXCEEDED:
+                            LOG.debug("Received retriable error in create DLQ 
topic response for {} using DLQ topic {}: {}", name(), dlqTopicName, 
errorMessage);
+                            if (!createTopicsBackoff.canAttempt()) {
+                                LOG.error("Exhausted max retries to create DLQ 
topic for {} using DLQ topic {} without success.", name(), dlqTopicName);
+                                requestErrorResponse(new Exception("Exhausted 
max retries to create DLQ topic without success."));
+                                break;
+                            }
+                            timer.add(new 
ShareGroupDLQTimerTask(createTopicsBackoff.backOff(), this));
+                            break;
+
+                        default:
+                            LOG.error("Unable to create DLQ topic for {} using 
DLQ topic {}: {}.", name(), dlqTopicName, errorMessage);
+                            requestErrorResponse(error.exception());
+                    }
+                    break;
+
+                case NETWORK_EXCEPTION: // Retriable client response error 
codes.
+                case REQUEST_TIMED_OUT:
+                    LOG.debug("Received retriable error in create topics 
client response for {} using DLQ topic {} due to {}.", name(), dlqTopicName, 
clientResponseErrorMessage);
+                    if (!createTopicsBackoff.canAttempt()) {
+                        LOG.error("Exhausted max retries to create DLQ topic 
due to error in client response for {} using DLQ topic {}.", name(), 
dlqTopicName);
+                        requestErrorResponse(clientResponseError.exception());
+                        break;
+                    }
+                    timer.add(new 
ShareGroupDLQTimerTask(createTopicsBackoff.backOff(), this));
+                    break;
 
+                default:
+                    LOG.error("Unable to create DLQ topic due to error in 
client response for {} using DLQ topic {}: {}", name(), dlqTopicName, 
clientResponseError.code());
+                    requestErrorResponse(clientResponseError.exception());
+            }
         }
     }
 
     private class SendThread extends InterBrokerSendThread {
-        private final 
ConcurrentLinkedQueue<ShareGroupDLQStateManager.ShareGroupDLQStateManagerHandler>
 queue = new ConcurrentLinkedQueue<>();
+        private final 
ConcurrentLinkedQueue<ShareGroupDLQStateManager.ProduceRequestHandler> queue = 
new ConcurrentLinkedQueue<>();
         private final Random random;
 
         SendThread(String name, KafkaClient client, int requestTimeoutMs, Time 
time, boolean isInterruptible, Random random) {
@@ -222,7 +346,7 @@ public class ShareGroupDLQStateManager {
         @Override
         public Collection<RequestAndCompletionHandler> generateRequests() {
             if (!queue.isEmpty()) {
-                ShareGroupDLQStateManager.ShareGroupDLQStateManagerHandler 
handler = queue.poll();
+                ShareGroupDLQStateManager.ProduceRequestHandler handler = 
queue.poll();
                 // At this point either a correctly named and configured DLQ 
topic exists or
                 // one is configured but does non-exist. We have already 
validated that the
                 // auto create should be enabled, in that case.
@@ -230,27 +354,34 @@ public class ShareGroupDLQStateManager {
                     // We need to send RPC to create the topic
                     Node randomNode = randomNode();
                     if (randomNode == Node.noNode()) {
-                        log.error("Unable to find node to use for coordinator 
lookup.");
+                        log.error("Unable to find node to send create topic 
request.");
                         // fatal failure, cannot retry or progress
                         // fail the RPC
-                        
handler.createTopicErrorResponse(Errors.BROKER_NOT_AVAILABLE.exception());
+                        
handler.requestErrorResponse(Errors.BROKER_NOT_AVAILABLE.exception());
                         return List.of();
                     }
-                    return List.of(new RequestAndCompletionHandler(
-                        time.milliseconds(),
-                        randomNode,
-                        handler.createTopicBuilder(),
-                        handler
-                    ));
+
+                    try {
+                        AbstractRequest.Builder<CreateTopicsRequest> builder = 
handler.createTopicBuilder();
+                        return List.of(new RequestAndCompletionHandler(
+                            time.milliseconds(),
+                            randomNode,
+                            builder,
+                            handler
+                        ));
+                    } catch (ConfigException exp) {
+                        log.error("Unable to create topic request.", exp);
+                        
handler.requestErrorResponse(Errors.INVALID_CONFIG.exception());
+                    }
                 }
             }
             return List.of();
         }
 
-        public void 
enqueue(ShareGroupDLQStateManager.ShareGroupDLQStateManagerHandler handler) {
+        public void enqueue(ShareGroupDLQStateManager.ProduceRequestHandler 
handler) {
             Optional<Throwable> exp = handler.validateDlqTopic();
             if (exp.isPresent()) {
-                handler.result().completeExceptionally(exp.get());
+                handler.requestErrorResponse(exp.get());
                 return;
             }
             queue.add(handler);
@@ -265,4 +396,19 @@ public class ShareGroupDLQStateManager {
             return nodes.get(random.nextInt(nodes.size()));
         }
     }
+
+    private final class ShareGroupDLQTimerTask extends TimerTask {
+        private final ProduceRequestHandler handler;
+
+        ShareGroupDLQTimerTask(long delayMs, ProduceRequestHandler handler) {
+            super(delayMs);
+            this.handler = handler;
+        }
+
+        @Override
+        public void run() {
+            sender.enqueue(handler);
+            sender.wakeup();
+        }
+    }
 }

Reply via email to