This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c48bc49a8fd KAFKA-20245: Add throwables and new record state for share 
group DLQ. [1/N] (#21785)
c48bc49a8fd is described below

commit c48bc49a8fdf8a3decb43f23ea3288e0bb3e0ef9
Author: Sushant Mahajan <[email protected]>
AuthorDate: Tue Mar 17 21:32:46 2026 +0530

    KAFKA-20245: Add throwables and new record state for share group DLQ. [1/N] 
(#21785)
    
    * Introduce the ARCHIVING record state and possible throwables in
    ShareGroupDLQ interface.
    * The ARCHIVING state will precede ARCHIVED and the transition will
    happen on successful DLQ or failure after max retries.
    
    Reviewers: Andrew Schofield <[email protected]>
---
 .../org/apache/kafka/server/share/dlq/ShareGroupDLQ.java   | 14 ++++++++++++++
 .../org/apache/kafka/server/share/fetch/RecordState.java   |  2 ++
 2 files changed, 16 insertions(+)

diff --git 
a/server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQ.java
 
b/server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQ.java
index 2dec3e914b7..8b56cc64073 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQ.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQ.java
@@ -23,6 +23,20 @@ import java.util.concurrent.CompletableFuture;
  * The main interface to identify implementations of dead letter queues for 
share groups.
  */
 public interface ShareGroupDLQ {
+    class ShareGroupDLQThrowable extends Throwable {
+        ShareGroupDLQThrowable(String message) {
+            // We don't want the stack trace to be filled.
+            super(message, null, false, false);
+        }
+    }
+
+    Throwable STALE_BATCH = new ShareGroupDLQThrowable("Offset part of stale 
batch.");
+    Throwable BEHIND_LSO = new ShareGroupDLQThrowable("Offset before LSO.");
+    Throwable ABORTED_TRANSACTION = new ShareGroupDLQThrowable("Offset part of 
aborted transaction.");
+    Throwable CLIENT_REJECT = new ShareGroupDLQThrowable("Offset rejected by 
client.");
+    Throwable DELIVERY_COUNT_EXCEEDED = new ShareGroupDLQThrowable("Offset 
delivery count exceeded the threshold.");
+    Throwable ACQUISITION_LOCK_TIMEOUT = new 
ShareGroupDLQThrowable("Acquisition lock timed out.");
+
     /**
      * Main method exposed to the world to enqueuing a record to the share 
groups dead letter queue.
      *
diff --git 
a/server/src/main/java/org/apache/kafka/server/share/fetch/RecordState.java 
b/server/src/main/java/org/apache/kafka/server/share/fetch/RecordState.java
index 99547f58456..917944fadd8 100644
--- a/server/src/main/java/org/apache/kafka/server/share/fetch/RecordState.java
+++ b/server/src/main/java/org/apache/kafka/server/share/fetch/RecordState.java
@@ -27,6 +27,7 @@ public enum RecordState {
     AVAILABLE((byte) 0),
     ACQUIRED((byte) 1),
     ACKNOWLEDGED((byte) 2),
+    ARCHIVING((byte) 3),    // Per KIP-1191
     ARCHIVED((byte) 4);
 
     public final byte id;
@@ -71,6 +72,7 @@ public enum RecordState {
             case 0 -> AVAILABLE;
             case 1 -> ACQUIRED;
             case 2 -> ACKNOWLEDGED;
+            case 3 -> ARCHIVING;
             case 4 -> ARCHIVED;
             default -> throw new IllegalArgumentException("Unknown record 
state id: " + id);
         };

Reply via email to