This is an automated email from the ASF dual-hosted git repository.
AndrewJSchofield pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b64498ac571 KAFKA-20549: Add logic for create topic in DLQ state
manager. [3/N] (#22284)
b64498ac571 is described below
commit b64498ac571a8a25533c366936772978e73be418
Author: Sushant Mahajan <[email protected]>
AuthorDate: Fri May 15 17:44:36 2026 +0530
KAFKA-20549: Add logic for create topic in DLQ state manager. [3/N] (#22284)
* The `ShareGroupDLQStateManager` is responsible for sending various
RPCs to support the DLQ operation.
* In this PR, we have added code to auto create the DLQ topic, if
appropriate configs are valid.
* Tests will be added when PRODUCE logic is integrated as well. The
component is not yet stitched with `SharePartition`.
* Minor change: changed factory method in `DefaultShareGroupDLQManager`
to static.
Reviewers: Andrew Schofield <[email protected]>
---
.../share/dlq/DefaultShareGroupDLQManager.java | 2 +-
.../share/dlq/ShareGroupDLQStateManager.java | 262 ++++++++++++++++-----
2 files changed, 205 insertions(+), 59 deletions(-)
diff --git
a/server-common/src/main/java/org/apache/kafka/server/share/dlq/DefaultShareGroupDLQManager.java
b/server-common/src/main/java/org/apache/kafka/server/share/dlq/DefaultShareGroupDLQManager.java
index 64361aa0564..c06af86d014 100644
---
a/server-common/src/main/java/org/apache/kafka/server/share/dlq/DefaultShareGroupDLQManager.java
+++
b/server-common/src/main/java/org/apache/kafka/server/share/dlq/DefaultShareGroupDLQManager.java
@@ -35,7 +35,7 @@ public class DefaultShareGroupDLQManager implements
ShareGroupDLQManager {
private static final Logger log =
LoggerFactory.getLogger(DefaultShareGroupDLQManager.class);
- public DefaultShareGroupDLQManager instance(ShareGroupDLQStateManager
stateManager) {
+ public static ShareGroupDLQManager instance(ShareGroupDLQStateManager
stateManager) {
DefaultShareGroupDLQManager instance = new
DefaultShareGroupDLQManager(stateManager);
instance.start();
return instance;
diff --git
a/server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManager.java
b/server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManager.java
index b55996b55a0..1bcfd1f025d 100644
---
a/server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManager.java
+++
b/server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManager.java
@@ -23,15 +23,20 @@ import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.RequestCompletionHandler;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.message.CreateTopicsRequestData;
+import org.apache.kafka.common.message.CreateTopicsResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractRequest;
-import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.CreateTopicsRequest;
+import org.apache.kafka.common.requests.CreateTopicsResponse;
import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.common.utils.Timer;
+import org.apache.kafka.common.utils.internals.ExponentialBackoffManager;
import org.apache.kafka.server.util.InterBrokerSendThread;
import org.apache.kafka.server.util.RequestAndCompletionHandler;
+import org.apache.kafka.server.util.timer.Timer;
+import org.apache.kafka.server.util.timer.TimerTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -56,6 +61,12 @@ public class ShareGroupDLQStateManager {
private final Time time;
private final Timer timer;
private final ShareGroupDLQMetadataCacheHelper cacheHelper;
+ public static final long REQUEST_BACKOFF_MS = 1_000L;
+ public static final long REQUEST_BACKOFF_MAX_MS = 30_000L;
+ private static final int MAX_REQUEST_ATTEMPTS = 5;
+ private static final int RETRY_BACKOFF_EXP_BASE =
CommonClientConfigs.RETRY_BACKOFF_EXP_BASE;
+ private static final double RETRY_BACKOFF_JITTER =
CommonClientConfigs.RETRY_BACKOFF_JITTER;
+ private static final Logger log =
LoggerFactory.getLogger(ShareGroupDLQStateManager.class);
public ShareGroupDLQStateManager(KafkaClient client,
ShareGroupDLQMetadataCacheHelper cacheHelper, Time time, Timer timer) {
if (client == null) {
@@ -89,6 +100,7 @@ public class ShareGroupDLQStateManager {
public void start() {
if (isStarted.compareAndSet(false, true)) {
+ log.info("Starting ShareGroupDLQStateManager");
this.sender.start();
isStarted.set(true);
}
@@ -109,28 +121,87 @@ public class ShareGroupDLQStateManager {
*/
public CompletableFuture<Void> dlq(ShareGroupDLQRecordParameter param) {
CompletableFuture<Void> future = new CompletableFuture<>();
- ProduceRequestHandler requestHandler = new
ProduceRequestHandler(param, future);
- sender.enqueue(requestHandler);
+ ProduceRequestHandler requestHandler = new
ProduceRequestHandler(param, future, REQUEST_BACKOFF_MS,
REQUEST_BACKOFF_MAX_MS, MAX_REQUEST_ATTEMPTS);
+ enqueue(requestHandler);
return future;
}
- private abstract class ShareGroupDLQStateManagerHandler implements
RequestCompletionHandler {
- private final ShareGroupDLQRecordParameter param;
+ private void enqueue(ProduceRequestHandler requestHandler) {
+ sender.enqueue(requestHandler);
+ }
- ShareGroupDLQStateManagerHandler(ShareGroupDLQRecordParameter param) {
+ private class ProduceRequestHandler implements RequestCompletionHandler {
+ private final CompletableFuture<Void> result;
+ private final ShareGroupDLQRecordParameter param;
+ private static final Logger LOG =
LoggerFactory.getLogger(ShareGroupDLQStateManager.ProduceRequestHandler.class);
+ private final ExponentialBackoffManager createTopicsBackoff;
+
+ public ProduceRequestHandler(
+ ShareGroupDLQRecordParameter param,
+ CompletableFuture<Void> result,
+ long backoffMs,
+ long backoffMaxMs,
+ int maxRPCRetryAttempts
+ ) {
this.param = param;
+ this.result = result;
+ this.createTopicsBackoff = new ExponentialBackoffManager(
+ maxRPCRetryAttempts,
+ backoffMs,
+ RETRY_BACKOFF_EXP_BASE,
+ backoffMaxMs,
+ RETRY_BACKOFF_JITTER
+ );
}
- protected abstract AbstractRequest.Builder<? extends AbstractRequest>
requestBuilder();
+ @Override
+ public void onComplete(ClientResponse response) {
+ // We don't know if FIND_COORD or actual REQUEST. Let's err on
side of request.
+ if (response == null) {
+
result.completeExceptionally(Errors.UNKNOWN_SERVER_ERROR.exception());
+ sender.wakeup();
+ return;
+ }
- protected abstract CompletableFuture<? extends AbstractResponse>
result();
+ if (response.requestHeader().apiKey() == ApiKeys.CREATE_TOPICS) {
+ handleCreateTopicsResponse(response);
+ } else {
+ // handle the response
+ }
- protected abstract String name();
+ sender.wakeup();
+ }
- protected abstract void createTopicErrorResponse(Exception exception);
+ public String name() {
+ return "ProduceRequestHandler";
+ }
- protected AbstractRequest.Builder<CreateTopicsRequest>
createTopicBuilder() {
- return new CreateTopicsRequest.Builder(new
CreateTopicsRequestData());
+ public void requestErrorResponse(Throwable exception) {
+ this.result.completeExceptionally(exception);
+ }
+
+ public AbstractRequest.Builder<CreateTopicsRequest>
createTopicBuilder() throws ConfigException {
+ // Since the configs are dynamic - something might have changed,
so revalidate.
+ Optional<String> dlqTopic =
cacheHelper.shareGroupDlqTopic(param.groupId());
+ if (dlqTopic.isEmpty()) {
+ throw new ConfigException(String.format("DLQ topic is not
configured for share group %s.", param.groupId()));
+ }
+
+ CreateTopicsRequestData.CreatableTopicConfigCollection
topicConfigs = new CreateTopicsRequestData.CreatableTopicConfigCollection();
+ CreateTopicsRequestData.CreatableTopicConfig enableDLQConfig = new
CreateTopicsRequestData.CreatableTopicConfig()
+
.setName(TopicConfig.ERRORS_DEADLETTERQUEUE_GROUP_ENABLE_CONFIG)
+ .setValue("true");
+ topicConfigs.add(enableDLQConfig);
+
+ CreateTopicsRequestData.CreatableTopicCollection topicCollection =
new CreateTopicsRequestData.CreatableTopicCollection();
+ topicCollection.add(new CreateTopicsRequestData.CreatableTopic()
+ .setName(dlqTopic.get())
+ .setReplicationFactor((short) -1)
+ .setNumPartitions((short) -1)
+ .setConfigs(topicConfigs));
+
+ return new CreateTopicsRequest.Builder(new
CreateTopicsRequestData()
+ .setTopics(topicCollection));
}
public Optional<Throwable> validateDlqTopic() {
@@ -140,7 +211,7 @@ public class ShareGroupDLQStateManager {
// Verify that DLQ topic for the share group is set and is
correctly named.
if (topicNameOpt.isEmpty()) {
return Optional.of(new
ConfigException(String.format("Configured DLQ topic name in share group: %s is
empty.", param.groupId())));
- } else if (!topicNameOpt.get().startsWith("__")) {
+ } else if (topicNameOpt.get().startsWith("__")) {
return Optional.of(new
ConfigException(String.format("Configured DLQ topic name in share group: %s
cannot start with __, topic: %s.", param.groupId(), topicNameOpt.get())));
}
@@ -165,53 +236,106 @@ public class ShareGroupDLQStateManager {
});
}
- public ShareGroupDLQRecordParameter recordParam() {
- return param;
- }
-
public boolean dlqTopicExists() {
Optional<String> shareGroupDlqTopic =
cacheHelper.shareGroupDlqTopic(param.groupId());
return
shareGroupDlqTopic.filter(cacheHelper::containsTopic).isPresent();
}
- }
- private class ProduceRequestHandler extends
ShareGroupDLQStateManagerHandler {
- private final CompletableFuture<Void> result;
- private static final Logger LOG =
LoggerFactory.getLogger(ShareGroupDLQStateManager.ProduceRequestHandler.class);
-
- public ProduceRequestHandler(ShareGroupDLQRecordParameter param,
CompletableFuture<Void> result) {
- super(param);
- this.result = result;
- }
-
- @Override
- protected AbstractRequest.Builder<? extends AbstractRequest>
requestBuilder() {
- return null;
- }
-
- @Override
- protected CompletableFuture<? extends AbstractResponse> result() {
- return CompletableFuture.completedFuture(null);
- }
+ // Visibility for testing
+ Optional<Errors> checkResponseError(ClientResponse response) {
+ if (response.hasResponse()) {
+ return Optional.empty();
+ }
- @Override
- protected String name() {
- return "ProduceRequestHandler";
+ String dlqTopicName =
cacheHelper.shareGroupDlqTopic(param.groupId()).orElse("UNKNOWN");
+
+ LOG.debug("Response for RPC {} with DLQ topic {} is invalid - {}",
name(), dlqTopicName, response);
+
+ if (response.authenticationException() != null) {
+ LOG.error("Authentication exception",
response.authenticationException());
+ Errors error =
Errors.forException(response.authenticationException());
+ return Optional.of(error);
+ } else if (response.versionMismatch() != null) {
+ LOG.error("Version mismatch exception",
response.versionMismatch());
+ Errors error = Errors.forException(response.versionMismatch());
+ return Optional.of(error);
+ } else if (response.wasDisconnected()) { // Retriable
+ return Optional.of(Errors.NETWORK_EXCEPTION);
+ } else if (response.wasTimedOut()) { // Retriable
+ LOG.debug("Response for RPC {} with DLQ topic {} timed out -
{}.", name(), dlqTopicName, response);
+ return Optional.of(Errors.REQUEST_TIMED_OUT);
+ } else {
+ return Optional.of(Errors.UNKNOWN_SERVER_ERROR);
+ }
}
- @Override
- protected void createTopicErrorResponse(Exception exception) {
- this.result.completeExceptionally(exception);
- }
+ private void handleCreateTopicsResponse(ClientResponse response) {
+ LOG.debug("Received CreateTopicsResponse {}", response);
+ createTopicsBackoff.incrementAttempt();
+ Errors clientResponseError =
checkResponseError(response).orElse(Errors.NONE);
+ String clientResponseErrorMessage = clientResponseError.message();
+ String dlqTopicName =
cacheHelper.shareGroupDlqTopic(param.groupId()).orElse("UNKNOWN");
+
+ switch (clientResponseError) {
+ case NONE:
+ // Topic has been created
+ CreateTopicsResponse createTopicsResponse =
((CreateTopicsResponse) response.responseBody());
+ Optional<CreateTopicsResponseData.CreatableTopicResult>
topicResultOpt = createTopicsResponse.data().topics().stream().findFirst();
+ if (topicResultOpt.isEmpty()) {
+ LOG.error("DLQ topic not found in create topic
response {}.", dlqTopicName);
+
requestErrorResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION.exception());
+ break;
+ }
- @Override
- public void onComplete(ClientResponse response) {
+ CreateTopicsResponseData.CreatableTopicResult topicResult
= topicResultOpt.get();
+ Errors error = Errors.forCode(topicResult.errorCode());
+ String errorMessage = topicResult.errorMessage();
+ switch (error) {
+ case NONE:
+ // Replace with enqueue post PRODUCE implementation
+ this.result.complete(null);
+ break;
+
+ case TOPIC_ALREADY_EXISTS:
+ // When topic creation request was sent, it could
be that it a previous request
+ // was in-flight. As such this request might get
TOPIC_ALREADY_EXISTS error, which is acceptable
+ // let it try again and sender logic will take
care of it.
+ case THROTTLING_QUOTA_EXCEEDED:
+ LOG.debug("Received retriable error in create DLQ
topic response for {} using DLQ topic {}: {}", name(), dlqTopicName,
errorMessage);
+ if (!createTopicsBackoff.canAttempt()) {
+ LOG.error("Exhausted max retries to create DLQ
topic for {} using DLQ topic {} without success.", name(), dlqTopicName);
+ requestErrorResponse(new Exception("Exhausted
max retries to create DLQ topic without success."));
+ break;
+ }
+ timer.add(new
ShareGroupDLQTimerTask(createTopicsBackoff.backOff(), this));
+ break;
+
+ default:
+ LOG.error("Unable to create DLQ topic for {} using
DLQ topic {}: {}.", name(), dlqTopicName, errorMessage);
+ requestErrorResponse(error.exception());
+ }
+ break;
+
+ case NETWORK_EXCEPTION: // Retriable client response error
codes.
+ case REQUEST_TIMED_OUT:
+ LOG.debug("Received retriable error in create topics
client response for {} using DLQ topic {} due to {}.", name(), dlqTopicName,
clientResponseErrorMessage);
+ if (!createTopicsBackoff.canAttempt()) {
+ LOG.error("Exhausted max retries to create DLQ topic
due to error in client response for {} using DLQ topic {}.", name(),
dlqTopicName);
+ requestErrorResponse(clientResponseError.exception());
+ break;
+ }
+ timer.add(new
ShareGroupDLQTimerTask(createTopicsBackoff.backOff(), this));
+ break;
+ default:
+ LOG.error("Unable to create DLQ topic due to error in
client response for {} using DLQ topic {}: {}", name(), dlqTopicName,
clientResponseError.code());
+ requestErrorResponse(clientResponseError.exception());
+ }
}
}
private class SendThread extends InterBrokerSendThread {
- private final
ConcurrentLinkedQueue<ShareGroupDLQStateManager.ShareGroupDLQStateManagerHandler>
queue = new ConcurrentLinkedQueue<>();
+ private final
ConcurrentLinkedQueue<ShareGroupDLQStateManager.ProduceRequestHandler> queue =
new ConcurrentLinkedQueue<>();
private final Random random;
SendThread(String name, KafkaClient client, int requestTimeoutMs, Time
time, boolean isInterruptible, Random random) {
@@ -222,7 +346,7 @@ public class ShareGroupDLQStateManager {
@Override
public Collection<RequestAndCompletionHandler> generateRequests() {
if (!queue.isEmpty()) {
- ShareGroupDLQStateManager.ShareGroupDLQStateManagerHandler
handler = queue.poll();
+ ShareGroupDLQStateManager.ProduceRequestHandler handler =
queue.poll();
// At this point either a correctly named and configured DLQ
topic exists or
// one is configured but does non-exist. We have already
validated that the
// auto create should be enabled, in that case.
@@ -230,27 +354,34 @@ public class ShareGroupDLQStateManager {
// We need to send RPC to create the topic
Node randomNode = randomNode();
if (randomNode == Node.noNode()) {
- log.error("Unable to find node to use for coordinator
lookup.");
+ log.error("Unable to find node to send create topic
request.");
// fatal failure, cannot retry or progress
// fail the RPC
-
handler.createTopicErrorResponse(Errors.BROKER_NOT_AVAILABLE.exception());
+
handler.requestErrorResponse(Errors.BROKER_NOT_AVAILABLE.exception());
return List.of();
}
- return List.of(new RequestAndCompletionHandler(
- time.milliseconds(),
- randomNode,
- handler.createTopicBuilder(),
- handler
- ));
+
+ try {
+ AbstractRequest.Builder<CreateTopicsRequest> builder =
handler.createTopicBuilder();
+ return List.of(new RequestAndCompletionHandler(
+ time.milliseconds(),
+ randomNode,
+ builder,
+ handler
+ ));
+ } catch (ConfigException exp) {
+ log.error("Unable to create topic request.", exp);
+
handler.requestErrorResponse(Errors.INVALID_CONFIG.exception());
+ }
}
}
return List.of();
}
- public void
enqueue(ShareGroupDLQStateManager.ShareGroupDLQStateManagerHandler handler) {
+ public void enqueue(ShareGroupDLQStateManager.ProduceRequestHandler
handler) {
Optional<Throwable> exp = handler.validateDlqTopic();
if (exp.isPresent()) {
- handler.result().completeExceptionally(exp.get());
+ handler.requestErrorResponse(exp.get());
return;
}
queue.add(handler);
@@ -265,4 +396,19 @@ public class ShareGroupDLQStateManager {
return nodes.get(random.nextInt(nodes.size()));
}
}
+
+ private final class ShareGroupDLQTimerTask extends TimerTask {
+ private final ProduceRequestHandler handler;
+
+ ShareGroupDLQTimerTask(long delayMs, ProduceRequestHandler handler) {
+ super(delayMs);
+ this.handler = handler;
+ }
+
+ @Override
+ public void run() {
+ sender.enqueue(handler);
+ sender.wakeup();
+ }
+ }
}