This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new c48bc49a8fd KAFKA-20245: Add throwables and new record state for share
group DLQ. [1/N] (#21785)
c48bc49a8fd is described below
commit c48bc49a8fdf8a3decb43f23ea3288e0bb3e0ef9
Author: Sushant Mahajan <[email protected]>
AuthorDate: Tue Mar 17 21:32:46 2026 +0530
KAFKA-20245: Add throwables and new record state for share group DLQ. [1/N]
(#21785)
* Introduce the ARCHIVING record state and possible throwables in
ShareGroupDLQ interface.
* The ARCHIVING state will precede ARCHIVED and the transition will
happen on successful DLQ or failure after max retries.
Reviewers: Andrew Schofield <[email protected]>
---
.../org/apache/kafka/server/share/dlq/ShareGroupDLQ.java | 14 ++++++++++++++
.../org/apache/kafka/server/share/fetch/RecordState.java | 2 ++
2 files changed, 16 insertions(+)
diff --git
a/server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQ.java
b/server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQ.java
index 2dec3e914b7..8b56cc64073 100644
---
a/server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQ.java
+++
b/server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQ.java
@@ -23,6 +23,20 @@ import java.util.concurrent.CompletableFuture;
* The main interface to identify implementations of dead letter queues for
share groups.
*/
public interface ShareGroupDLQ {
+ class ShareGroupDLQThrowable extends Throwable {
+ ShareGroupDLQThrowable(String message) {
+ // We don't want the stack trace to be filled.
+ super(message, null, false, false);
+ }
+ }
+
+ Throwable STALE_BATCH = new ShareGroupDLQThrowable("Offset part of stale
batch.");
+ Throwable BEHIND_LSO = new ShareGroupDLQThrowable("Offset before LSO.");
+ Throwable ABORTED_TRANSACTION = new ShareGroupDLQThrowable("Offset part of
aborted transaction.");
+ Throwable CLIENT_REJECT = new ShareGroupDLQThrowable("Offset rejected by
client.");
+ Throwable DELIVERY_COUNT_EXCEEDED = new ShareGroupDLQThrowable("Offset
delivery count exceeded the threshold.");
+ Throwable ACQUISITION_LOCK_TIMEOUT = new
ShareGroupDLQThrowable("Acquisition lock timed out.");
+
/**
* Main method exposed to the world to enqueuing a record to the share
groups dead letter queue.
*
diff --git
a/server/src/main/java/org/apache/kafka/server/share/fetch/RecordState.java
b/server/src/main/java/org/apache/kafka/server/share/fetch/RecordState.java
index 99547f58456..917944fadd8 100644
--- a/server/src/main/java/org/apache/kafka/server/share/fetch/RecordState.java
+++ b/server/src/main/java/org/apache/kafka/server/share/fetch/RecordState.java
@@ -27,6 +27,7 @@ public enum RecordState {
AVAILABLE((byte) 0),
ACQUIRED((byte) 1),
ACKNOWLEDGED((byte) 2),
+ ARCHIVING((byte) 3), // Per KIP-1191
ARCHIVED((byte) 4);
public final byte id;
@@ -71,6 +72,7 @@ public enum RecordState {
case 0 -> AVAILABLE;
case 1 -> ACQUIRED;
case 2 -> ACKNOWLEDGED;
+ case 3 -> ARCHIVING;
case 4 -> ARCHIVED;
default -> throw new IllegalArgumentException("Unknown record
state id: " + id);
};